lucasbru commented on code in PR #22554:
URL: https://github.com/apache/kafka/pull/22554#discussion_r3413698837


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1495,6 +1570,49 @@ private List<String> 
updateResponseAndGetNonErrorGroupList(
         return groupSet.stream().toList();
     }
 
+    /**
+     * Move plugin failures into {@code deletableGroupResults} and return the 
group ids
+     * that should still proceed to tombstoning.
+     *
+     * <p>Per KIP-1331, {@code GROUP_DELETION_FAILED} and the per-group {@code 
ErrorMessage}
+     * field were introduced in DeleteGroups v3. For older clients we 
downgrade the error to
+     * {@code UNKNOWN_SERVER_ERROR} with no message — matching the convention 
used by KIP-1043
+     * for new error codes that pre-existing request versions cannot 
interpret. Errors that
+     * predate this KIP (e.g. NOT_COORDINATOR surfaced from the runtime via 
the exceptionally
+     * branch above) are passed through unchanged.
+     */
+    private static List<String> filterStreamsTopologyErrors(

Review Comment:
   This method is doing two unrelated things: filtering (partition groupIds 
into retained-to-tombstone vs failed) and request-version management (the 
GROUP_DELETION_FAILED -> UNKNOWN_SERVER_ERROR + null-message downgrade for 
requestVersion < 3). The version handling is the only reason it needs the 
requestVersion param.
   
   Suggest splitting them: keep filterStreamsTopologyErrors as a pure, 
version-agnostic filter that adds the raw ApiError to deletableGroupResults and 
returns retained (drop the requestVersion param), and move the v<3 downgrade up 
into KafkaApis.handleDeleteGroupsRequest, where requestVersion is already in 
scope and the rest of the DeleteGroups response shaping lives. That's also 
where new error codes are version-gated elsewhere (KIP-1043 precedent), and it 
keeps the coordinator version-agnostic like the rest of the delete pipeline.
   
   The effectiveMessage = null then drops out on its own: ErrorMessage is 
versions 3+ / ignorable, so it's already stripped for old clients at the 
serialization layer — only the error code actually needs gating.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyDescriptionManager.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.internals.LogContext;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.GroupCoordinatorShard;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescription;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescriptionPlugin;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsTopologyDescriptionPermanentFailureException;
+
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+/**
+ * Broker-level component that owns everything tied to the streams-group 
topology
+ * description plugin: the configured plugin reference, the per-group
+ * re-solicitation back-off, and the three entry points the group coordinator 
calls
+ * into — heartbeat post-processing, the push RPC, and the {@code 
DeleteGroups} hook.
+ *
+ * <p>This class is broker-level (one instance per {@code 
GroupCoordinatorService}); the
+ * back-off map is keyed by {@code groupId} and shared across all partitions 
hosted on the
+ * broker. State here is intentionally non-timeline and non-replayed: it is 
rebuilt from
+ * scratch on broker restart, and the persisted {@code 
StoredDescriptionTopologyEpoch} /
+ * {@code FailedDescriptionTopologyEpoch} fields on each streams group drive
+ * convergence after a restart.
+ *
+ * <p>Methods that schedule runtime operations require a partition resolver 
supplied by
+ * the caller (typically {@code GroupCoordinatorService::topicPartitionFor}) 
so this
+ * class can stay decoupled from the offsets-topic partition layout.
+ */
+public class TopologyDescriptionManager {
+    private final Logger log;
+    private final Optional<StreamsGroupTopologyDescriptionPlugin> plugin;
+    private final StreamsGroupTopologyDescriptionBackoff backoff;
+    private final CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> 
runtime;
+    private final Function<String, TopicPartition> topicPartitionFor;
+
+    public TopologyDescriptionManager(
+        LogContext logContext,
+        Optional<StreamsGroupTopologyDescriptionPlugin> plugin,
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime,
+        Function<String, TopicPartition> topicPartitionFor,
+        Time time
+    ) {
+        this.log = logContext.logger(TopologyDescriptionManager.class);
+        this.plugin = plugin;
+        this.runtime = runtime;
+        this.topicPartitionFor = topicPartitionFor;
+        this.backoff = new StreamsGroupTopologyDescriptionBackoff(time);
+    }
+
+    /**
+     * @return true if a topology description plugin is configured on this 
broker.
+     */
+    public boolean isPluginConfigured() {
+        return plugin.isPresent();
+    }
+
+    /**
+     * Post-processes a successful streams group heartbeat result by deciding 
whether the
+     * broker should set {@code TopologyDescriptionRequired=true} on the 
response, and
+     * arming the per-group back-off when it does.
+     *
+     * <p>The flag is set when the topology description plugin is configured, 
the group
+     * has resolved to a topology epoch, that epoch is neither stored nor 
permanently
+     * failed at the plugin, no back-off is in effect for this epoch, and the 
response
+     * does not carry a {@code STALE_TOPOLOGY} status (the member would just 
be told to
+     * catch up first). When the response already carries an error code we 
leave it
+     * alone.
+     */
+    public StreamsGroupHeartbeatResult maybeSetTopologyDescriptionRequired(
+        StreamsGroupHeartbeatResult result,
+        String groupId
+    ) {
+        if (plugin.isEmpty()) {
+            return result;
+        }
+        StreamsGroupHeartbeatResponseData response = result.data();
+        if (response.errorCode() != Errors.NONE.code()) {
+            return result;
+        }
+        int currentEpoch = result.currentTopologyEpoch();
+        if (currentEpoch < 0
+            || result.storedDescriptionTopologyEpoch() == currentEpoch
+            || result.failedDescriptionTopologyEpoch() == currentEpoch
+            || responseHasStaleTopology(response)) {
+            return result;
+        }
+        // Atomic check-and-arm: only set the flag if the back-off window is 
not already
+        // in effect for this epoch, so two concurrent heartbeats for the same 
group cannot
+        // both arm the back-off and double the window beyond its intended 
length.
+        if (backoff.armIfNotActive(groupId, currentEpoch)) {
+            response.setTopologyDescriptionRequired(true);
+        }
+        return result;
+    }
+
+    /**
+     * Reject the request synchronously when no plugin is configured or the 
request fails
+     * basic structural validation. Returns the response to send back to the 
client, or
+     * empty when the request is accepted for further processing. The caller 
is expected
+     * to have already short-circuited on a non-active coordinator.
+     */
+    public Optional<StreamsGroupTopologyDescriptionUpdateResponseData> 
preCheckTopologyDescriptionUpdate(
+        StreamsGroupTopologyDescriptionUpdateRequestData request
+    ) {
+        if (plugin.isEmpty()) {
+            return Optional.of(errorResponse(
+                Errors.UNSUPPORTED_VERSION,
+                "The broker has no streams group topology description plugin 
configured."
+            ));
+        }
+        if (request.memberId() == null || request.memberId().isEmpty()) {
+            return Optional.of(errorResponse(Errors.INVALID_REQUEST, "MemberId 
can't be empty."));
+        }
+        if (request.groupId() == null || request.groupId().isEmpty()) {
+            return Optional.of(errorResponse(Errors.INVALID_REQUEST, "GroupId 
can't be empty."));
+        }
+        if (request.topologyDescription() == null) {
+            return Optional.of(errorResponse(Errors.INVALID_REQUEST, 
"TopologyDescription can't be null."));
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Drive the push chain: validate the (group, member), convert the wire 
payload, call
+     * the plugin, persist the outcome, and centralize back-off state 
mutations in a
+     * single {@code whenComplete}.
+     *
+     * <p>The chain carries the terminal disposition through an {@link 
AtomicReference}
+     * holder so {@code whenComplete} can act on it without having to reason 
about the
+     * response shape. Default is {@link BackoffAction#ARM}; any post-plugin 
failure
+     * (including a metadata-record write that fails after a successful plugin 
call)
+     * therefore re-arms the back-off and the next heartbeat re-solicits an 
idempotent
+     * re-push.
+     */
+    public 
CompletableFuture<StreamsGroupTopologyDescriptionUpdateResponseData> 
handleSetTopology(
+        StreamsGroupTopologyDescriptionUpdateRequestData request
+    ) {
+        final String groupId = request.groupId();
+        final String memberId = request.memberId();
+        final int pushedEpoch = request.topologyEpoch();
+        final TopicPartition tp = topicPartitionFor.apply(groupId);
+        final StreamsGroupTopologyDescriptionPlugin p = plugin.get();
+
+        final AtomicReference<BackoffAction> backoffAction = new 
AtomicReference<>(BackoffAction.ARM);
+        return runtime.scheduleReadOperation(
+                "streams-group-topology-description-validate",
+                tp,
+                (coordinator, lastCommittedOffset) -> {
+                    coordinator.validateStreamsGroupMember(groupId, memberId, 
lastCommittedOffset);
+                    return null;
+                })
+            .thenApply(__ -> 
StreamsGroupTopologyDescriptionConverter.fromRequest(request.topologyDescription()))
+            .thenCompose(description -> invokePluginSetTopology(p, groupId, 
pushedEpoch, description))
+            .thenCompose(pluginOutcome -> switch (pluginOutcome.kind()) {
+                case SUCCESS -> runtime.scheduleWriteOperation(
+                    "streams-group-set-stored-topology-epoch",
+                    tp,
+                    coordinator -> 
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, false)
+                ).thenApply(unused -> {
+                    backoffAction.set(BackoffAction.CLEAR);
+                    return new 
StreamsGroupTopologyDescriptionUpdateResponseData();
+                });
+                case PERMANENT -> runtime.scheduleWriteOperation(
+                    "streams-group-set-failed-topology-epoch",
+                    tp,
+                    coordinator -> 
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, true)
+                ).thenApply(unused -> {
+                    backoffAction.set(BackoffAction.CLEAR);
+                    return errorResponse(
+                        Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED, 
pluginOutcome.message());
+                });
+                case TRANSIENT -> 
CompletableFuture.completedFuture(errorResponse(
+                    Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED, 
pluginOutcome.message()));
+            })
+            .whenComplete((response, throwable) -> {
+                if (backoffAction.get() == BackoffAction.CLEAR) {
+                    backoff.clear(groupId);
+                } else {
+                    backoff.armOrExtend(groupId, pushedEpoch);
+                }
+            });
+    }
+
+    /**
+     * Call {@code plugin.deleteTopology} for every supplied group id that is 
a streams
+     * group carrying a stored topology description. Returns a per-group map 
of failures
+     * keyed by group id; groups absent from the map either had no plugin 
state or the
+     * plugin call succeeded. Empty (no-op) when no plugin is configured. Used 
by the
+     * service-level {@code DeleteGroups} flow to gate tombstoning on plugin 
success.
+     */
+    public CompletableFuture<Map<String, ApiError>> deleteBeforeGroupDelete(
+        TopicPartition topicPartition,
+        List<String> groupIds
+    ) {
+        if (plugin.isEmpty()) {
+            return CompletableFuture.completedFuture(Map.of());
+        }
+        final StreamsGroupTopologyDescriptionPlugin p = plugin.get();
+        return runtime.scheduleReadOperation(
+                "streams-group-topology-pre-delete",
+                topicPartition,
+                (coordinator, lastCommittedOffset) ->
+                    
coordinator.streamsGroupsWithStoredTopologyDescription(groupIds, 
lastCommittedOffset))
+            .thenCompose(groupsWithStored -> {
+                if (groupsWithStored.isEmpty()) {
+                    return CompletableFuture.completedFuture(Map.<String, 
ApiError>of());
+                }
+                Map<String, CompletableFuture<Throwable>> calls = new 
HashMap<>(groupsWithStored.size());
+                for (String groupId : groupsWithStored) {
+                    CompletableFuture<Throwable> outcome;
+                    try {
+                        outcome = p.deleteTopology(groupId).handle((unused, 
throwable) -> throwable);
+                    } catch (Throwable t) {
+                        outcome = CompletableFuture.completedFuture(t);
+                    }
+                    calls.put(groupId, outcome);
+                }
+                CompletableFuture<?>[] all = calls.values().toArray(new 
CompletableFuture<?>[0]);
+                return CompletableFuture.allOf(all).thenApply(unused -> {
+                    Map<String, ApiError> failures = new HashMap<>();
+                    calls.forEach((groupId, throwableFuture) -> {
+                        Throwable t = throwableFuture.join();
+                        if (t != null) {
+                            Throwable cause = t instanceof CompletionException 
&& t.getCause() != null
+                                ? t.getCause() : t;
+                            log.warn("Topology description plugin failed to 
delete topology for streams group {}.",
+                                groupId, cause);
+                            backoff.clear(groupId);
+                            failures.put(groupId, new 
ApiError(Errors.GROUP_DELETION_FAILED, cause.getMessage()));
+                        } else {
+                            backoff.clear(groupId);
+                        }
+                    });
+                    return failures;
+                });
+            })
+            .exceptionally(exception -> {
+                // A failure to read the streams group state (NOT_COORDINATOR 
etc.) is treated
+                // the same as a plugin failure: every retained group is held 
back from
+                // tombstoning so the caller can retry. The downstream chain 
prefers the more
+                // specific error from the runtime over GROUP_DELETION_FAILED.
+                Errors error = Errors.forException(exception);

Review Comment:
   Two things on the back-off handling here.
   
   (1) Lifecycle: the success/normal path clears the back-off for each group, 
but this exceptionally branch (read-op failure) doesn't, so a group with an 
armed back-off keeps a stale entry. More generally, the back-off map is only 
ever cleared from this class's call sites, so entries leak for any group 
removed by a path other than this hook (expiry, partition unload, 
tombstone-via-replay) — unbounded growth keyed by dead groupIds over a 
long-running broker. Back-off eviction probably wants to hang off the group's 
real lifecycle teardown rather than the delete flow.
   
   (2) While here, the gather shape above is heavier than needed: we store 
CompletableFuture<Throwable> keyed by groupId (the .handle throws the groupId 
away), then re-walk the map and join each to rebuild the failures map. Having 
each future produce its own keyed result — do the clear/log/ApiError inside the 
.handle and return a Map.entry (or null) per group, then allOf + collect — 
keeps the single-threaded aggregation but drops the separate calls map and the 
throwaway-then-rebuild of the groupId. That restructuring also naturally moves 
backoff.clear into the per-future handle, which fixes the skipped-clear in (1).



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyDescriptionManager.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.internals.LogContext;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.GroupCoordinatorShard;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescription;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescriptionPlugin;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsTopologyDescriptionPermanentFailureException;
+
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+/**
+ * Broker-level component that owns everything tied to the streams-group 
topology
+ * description plugin: the configured plugin reference, the per-group
+ * re-solicitation back-off, and the three entry points the group coordinator 
calls
+ * into — heartbeat post-processing, the push RPC, and the {@code 
DeleteGroups} hook.
+ *
+ * <p>This class is broker-level (one instance per {@code 
GroupCoordinatorService}); the
+ * back-off map is keyed by {@code groupId} and shared across all partitions 
hosted on the
+ * broker. State here is intentionally non-timeline and non-replayed: it is 
rebuilt from
+ * scratch on broker restart, and the persisted {@code 
StoredDescriptionTopologyEpoch} /
+ * {@code FailedDescriptionTopologyEpoch} fields on each streams group drive
+ * convergence after a restart.
+ *
+ * <p>Methods that schedule runtime operations require a partition resolver 
supplied by
+ * the caller (typically {@code GroupCoordinatorService::topicPartitionFor}) 
so this
+ * class can stay decoupled from the offsets-topic partition layout.
+ */
+public class TopologyDescriptionManager {
+    private final Logger log;
+    private final Optional<StreamsGroupTopologyDescriptionPlugin> plugin;
+    private final StreamsGroupTopologyDescriptionBackoff backoff;
+    private final CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> 
runtime;
+    private final Function<String, TopicPartition> topicPartitionFor;
+
+    public TopologyDescriptionManager(
+        LogContext logContext,
+        Optional<StreamsGroupTopologyDescriptionPlugin> plugin,
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime,
+        Function<String, TopicPartition> topicPartitionFor,
+        Time time
+    ) {
+        this.log = logContext.logger(TopologyDescriptionManager.class);
+        this.plugin = plugin;
+        this.runtime = runtime;
+        this.topicPartitionFor = topicPartitionFor;
+        this.backoff = new StreamsGroupTopologyDescriptionBackoff(time);
+    }
+
+    /**
+     * @return true if a topology description plugin is configured on this 
broker.
+     */
+    public boolean isPluginConfigured() {
+        return plugin.isPresent();
+    }
+
+    /**
+     * Post-processes a successful streams group heartbeat result by deciding 
whether the
+     * broker should set {@code TopologyDescriptionRequired=true} on the 
response, and
+     * arming the per-group back-off when it does.
+     *
+     * <p>The flag is set when the topology description plugin is configured, 
the group
+     * has resolved to a topology epoch, that epoch is neither stored nor 
permanently
+     * failed at the plugin, no back-off is in effect for this epoch, and the 
response
+     * does not carry a {@code STALE_TOPOLOGY} status (the member would just 
be told to
+     * catch up first). When the response already carries an error code we 
leave it
+     * alone.
+     */
+    public StreamsGroupHeartbeatResult maybeSetTopologyDescriptionRequired(
+        StreamsGroupHeartbeatResult result,
+        String groupId
+    ) {
+        if (plugin.isEmpty()) {
+            return result;
+        }
+        StreamsGroupHeartbeatResponseData response = result.data();
+        if (response.errorCode() != Errors.NONE.code()) {
+            return result;
+        }
+        int currentEpoch = result.currentTopologyEpoch();
+        if (currentEpoch < 0
+            || result.storedDescriptionTopologyEpoch() == currentEpoch
+            || result.failedDescriptionTopologyEpoch() == currentEpoch
+            || responseHasStaleTopology(response)) {
+            return result;
+        }
+        // Atomic check-and-arm: only set the flag if the back-off window is 
not already
+        // in effect for this epoch, so two concurrent heartbeats for the same 
group cannot
+        // both arm the back-off and double the window beyond its intended 
length.
+        if (backoff.armIfNotActive(groupId, currentEpoch)) {
+            response.setTopologyDescriptionRequired(true);
+        }
+        return result;
+    }
+
+    /**
+     * Reject the request synchronously when no plugin is configured or the 
request fails
+     * basic structural validation. Returns the response to send back to the 
client, or
+     * empty when the request is accepted for further processing. The caller 
is expected
+     * to have already short-circuited on a non-active coordinator.
+     */
+    public Optional<StreamsGroupTopologyDescriptionUpdateResponseData> 
preCheckTopologyDescriptionUpdate(
+        StreamsGroupTopologyDescriptionUpdateRequestData request
+    ) {
+        if (plugin.isEmpty()) {
+            return Optional.of(errorResponse(
+                Errors.UNSUPPORTED_VERSION,
+                "The broker has no streams group topology description plugin 
configured."
+            ));
+        }
+        if (request.memberId() == null || request.memberId().isEmpty()) {
+            return Optional.of(errorResponse(Errors.INVALID_REQUEST, "MemberId 
can't be empty."));
+        }
+        if (request.groupId() == null || request.groupId().isEmpty()) {
+            return Optional.of(errorResponse(Errors.INVALID_REQUEST, "GroupId 
can't be empty."));
+        }
+        if (request.topologyDescription() == null) {
+            return Optional.of(errorResponse(Errors.INVALID_REQUEST, 
"TopologyDescription can't be null."));
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Drive the push chain: validate the (group, member), convert the wire 
payload, call
+     * the plugin, persist the outcome, and centralize back-off state 
mutations in a
+     * single {@code whenComplete}.
+     *
+     * <p>The chain carries the terminal disposition through an {@link 
AtomicReference}
+     * holder so {@code whenComplete} can act on it without having to reason 
about the
+     * response shape. Default is {@link BackoffAction#ARM}; any post-plugin 
failure
+     * (including a metadata-record write that fails after a successful plugin 
call)
+     * therefore re-arms the back-off and the next heartbeat re-solicits an 
idempotent
+     * re-push.
+     */
+    public 
CompletableFuture<StreamsGroupTopologyDescriptionUpdateResponseData> 
handleSetTopology(
+        StreamsGroupTopologyDescriptionUpdateRequestData request
+    ) {
+        final String groupId = request.groupId();
+        final String memberId = request.memberId();
+        final int pushedEpoch = request.topologyEpoch();
+        final TopicPartition tp = topicPartitionFor.apply(groupId);
+        final StreamsGroupTopologyDescriptionPlugin p = plugin.get();
+
+        final AtomicReference<BackoffAction> backoffAction = new 
AtomicReference<>(BackoffAction.ARM);
+        return runtime.scheduleReadOperation(
+                "streams-group-topology-description-validate",
+                tp,
+                (coordinator, lastCommittedOffset) -> {
+                    coordinator.validateStreamsGroupMember(groupId, memberId, 
lastCommittedOffset);
+                    return null;
+                })
+            .thenApply(__ -> 
StreamsGroupTopologyDescriptionConverter.fromRequest(request.topologyDescription()))
+            .thenCompose(description -> invokePluginSetTopology(p, groupId, 
pushedEpoch, description))
+            .thenCompose(pluginOutcome -> switch (pluginOutcome.kind()) {
+                case SUCCESS -> runtime.scheduleWriteOperation(
+                    "streams-group-set-stored-topology-epoch",
+                    tp,
+                    coordinator -> 
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, false)
+                ).thenApply(unused -> {
+                    backoffAction.set(BackoffAction.CLEAR);
+                    return new 
StreamsGroupTopologyDescriptionUpdateResponseData();
+                });
+                case PERMANENT -> runtime.scheduleWriteOperation(
+                    "streams-group-set-failed-topology-epoch",
+                    tp,
+                    coordinator -> 
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, true)
+                ).thenApply(unused -> {
+                    backoffAction.set(BackoffAction.CLEAR);
+                    return errorResponse(
+                        Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED, 
pluginOutcome.message());
+                });
+                case TRANSIENT -> 
CompletableFuture.completedFuture(errorResponse(
+                    Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED, 
pluginOutcome.message()));
+            })
+            .whenComplete((response, throwable) -> {
+                if (backoffAction.get() == BackoffAction.CLEAR) {
+                    backoff.clear(groupId);
+                } else {
+                    backoff.armOrExtend(groupId, pushedEpoch);
+                }
+            });
+    }
+
+    /**
+     * Call {@code plugin.deleteTopology} for every supplied group id that is 
a streams
+     * group carrying a stored topology description. Returns a per-group map 
of failures
+     * keyed by group id; groups absent from the map either had no plugin 
state or the
+     * plugin call succeeded. Empty (no-op) when no plugin is configured. Used 
by the
+     * service-level {@code DeleteGroups} flow to gate tombstoning on plugin 
success.
+     */
+    public CompletableFuture<Map<String, ApiError>> deleteBeforeGroupDelete(
+        TopicPartition topicPartition,
+        List<String> groupIds
+    ) {
+        if (plugin.isEmpty()) {
+            return CompletableFuture.completedFuture(Map.of());
+        }
+        final StreamsGroupTopologyDescriptionPlugin p = plugin.get();
+        return runtime.scheduleReadOperation(
+                "streams-group-topology-pre-delete",
+                topicPartition,
+                (coordinator, lastCommittedOffset) ->
+                    
coordinator.streamsGroupsWithStoredTopologyDescription(groupIds, 
lastCommittedOffset))
+            .thenCompose(groupsWithStored -> {
+                if (groupsWithStored.isEmpty()) {
+                    return CompletableFuture.completedFuture(Map.<String, 
ApiError>of());
+                }
+                Map<String, CompletableFuture<Throwable>> calls = new 
HashMap<>(groupsWithStored.size());
+                for (String groupId : groupsWithStored) {
+                    CompletableFuture<Throwable> outcome;
+                    try {
+                        outcome = p.deleteTopology(groupId).handle((unused, 
throwable) -> throwable);
+                    } catch (Throwable t) {
+                        outcome = CompletableFuture.completedFuture(t);
+                    }
+                    calls.put(groupId, outcome);
+                }
+                CompletableFuture<?>[] all = calls.values().toArray(new 
CompletableFuture<?>[0]);
+                return CompletableFuture.allOf(all).thenApply(unused -> {
+                    Map<String, ApiError> failures = new HashMap<>();
+                    calls.forEach((groupId, throwableFuture) -> {
+                        Throwable t = throwableFuture.join();
+                        if (t != null) {
+                            Throwable cause = t instanceof CompletionException 
&& t.getCause() != null
+                                ? t.getCause() : t;
+                            log.warn("Topology description plugin failed to 
delete topology for streams group {}.",
+                                groupId, cause);
+                            backoff.clear(groupId);
+                            failures.put(groupId, new 
ApiError(Errors.GROUP_DELETION_FAILED, cause.getMessage()));
+                        } else {
+                            backoff.clear(groupId);
+                        }
+                    });
+                    return failures;
+                });
+            })
+            .exceptionally(exception -> {
+                // A failure to read the streams group state (NOT_COORDINATOR 
etc.) is treated
+                // the same as a plugin failure: every retained group is held 
back from
+                // tombstoning so the caller can retry. The downstream chain 
prefers the more
+                // specific error from the runtime over GROUP_DELETION_FAILED.
+                Errors error = Errors.forException(exception);
+                Map<String, ApiError> failures = new HashMap<>();
+                for (String groupId : groupIds) {
+                    failures.put(groupId, new ApiError(error, 
exception.getMessage()));

Review Comment:
   exception here is the CompletionException from the chain, so 
exception.getMessage() is the cause's toString() — the client ends up with an 
ErrorMessage like "org.apache.kafka.common.errors.NotCoordinatorException: 
...". ApiError.fromThrowable(exception) unwraps it and also suppresses the 
message for UNKNOWN_SERVER_ERROR, and it's what the sibling DeleteGroups 
handlers use.



##########
group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/streams/StreamsGroupTopologyDescription.java:
##########
@@ -58,24 +59,24 @@ public sealed interface Node {
     public record Source(String name, Set<String> topics, Set<String> 
successors) implements Node {
         public Source {
             Objects.requireNonNull(name, "name");
-            topics = Set.copyOf(Objects.requireNonNull(topics, "topics"));
-            successors = Set.copyOf(Objects.requireNonNull(successors, 
"successors"));
+            topics = 
Collections.unmodifiableSet(Objects.requireNonNull(topics, "topics"));
+            successors = 
Collections.unmodifiableSet(Objects.requireNonNull(successors, "successors"));
         }
     }
 
     public record Processor(String name, Set<String> stores, Set<String> 
successors) implements Node {
         public Processor {
             Objects.requireNonNull(name, "name");
-            stores = Set.copyOf(Objects.requireNonNull(stores, "stores"));
-            successors = Set.copyOf(Objects.requireNonNull(successors, 
"successors"));
+            stores = 
Collections.unmodifiableSet(Objects.requireNonNull(stores, "stores"));
+            successors = 
Collections.unmodifiableSet(Objects.requireNonNull(successors, "successors"));
         }

Review Comment:
   Same immutability point as the Source node above, and it is a #22552 change 
— let's resolve it on that PR.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyDescriptionManager.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.internals.LogContext;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.GroupCoordinatorShard;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescription;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescriptionPlugin;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsTopologyDescriptionPermanentFailureException;
+
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+/**
+ * Broker-level component that owns everything tied to the streams-group 
topology
+ * description plugin: the configured plugin reference, the per-group
+ * re-solicitation back-off, and the three entry points the group coordinator 
calls
+ * into — heartbeat post-processing, the push RPC, and the {@code 
DeleteGroups} hook.
+ *
+ * <p>This class is broker-level (one instance per {@code 
GroupCoordinatorService}); the
+ * back-off map is keyed by {@code groupId} and shared across all partitions 
hosted on the
+ * broker. State here is intentionally non-timeline and non-replayed: it is 
rebuilt from
+ * scratch on broker restart, and the persisted {@code 
StoredDescriptionTopologyEpoch} /
+ * {@code FailedDescriptionTopologyEpoch} fields on each streams group drive
+ * convergence after a restart.
+ *
+ * <p>Methods that schedule runtime operations require a partition resolver 
supplied by
+ * the caller (typically {@code GroupCoordinatorService::topicPartitionFor}) 
so this
+ * class can stay decoupled from the offsets-topic partition layout.
+ */
+public class TopologyDescriptionManager {
+    private final Logger log;
+    private final Optional<StreamsGroupTopologyDescriptionPlugin> plugin;
+    private final StreamsGroupTopologyDescriptionBackoff backoff;
+    private final CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> 
runtime;
+    private final Function<String, TopicPartition> topicPartitionFor;
+
+    public TopologyDescriptionManager(
+        LogContext logContext,
+        Optional<StreamsGroupTopologyDescriptionPlugin> plugin,
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime,
+        Function<String, TopicPartition> topicPartitionFor,
+        Time time
+    ) {
+        this.log = logContext.logger(TopologyDescriptionManager.class);
+        this.plugin = plugin;
+        this.runtime = runtime;
+        this.topicPartitionFor = topicPartitionFor;
+        this.backoff = new StreamsGroupTopologyDescriptionBackoff(time);
+    }
+
+    /**
+     * @return true if a topology description plugin is configured on this 
broker.
+     */
+    public boolean isPluginConfigured() {
+        return plugin.isPresent();
+    }
+
+    /**
+     * Post-processes a successful streams group heartbeat result by deciding 
whether the
+     * broker should set {@code TopologyDescriptionRequired=true} on the 
response, and
+     * arming the per-group back-off when it does.
+     *
+     * <p>The flag is set when the topology description plugin is configured, 
the group
+     * has resolved to a topology epoch, that epoch is neither stored nor 
permanently
+     * failed at the plugin, no back-off is in effect for this epoch, and the 
response
+     * does not carry a {@code STALE_TOPOLOGY} status (the member would just 
be told to
+     * catch up first). When the response already carries an error code we 
leave it
+     * alone.
+     */
+    public StreamsGroupHeartbeatResult maybeSetTopologyDescriptionRequired(
+        StreamsGroupHeartbeatResult result,
+        String groupId
+    ) {
+        if (plugin.isEmpty()) {
+            return result;
+        }
+        StreamsGroupHeartbeatResponseData response = result.data();
+        if (response.errorCode() != Errors.NONE.code()) {
+            return result;
+        }
+        int currentEpoch = result.currentTopologyEpoch();
+        if (currentEpoch < 0
+            || result.storedDescriptionTopologyEpoch() == currentEpoch
+            || result.failedDescriptionTopologyEpoch() == currentEpoch
+            || responseHasStaleTopology(response)) {
+            return result;
+        }
+        // Atomic check-and-arm: only set the flag if the back-off window is 
not already
+        // in effect for this epoch, so two concurrent heartbeats for the same 
group cannot
+        // both arm the back-off and double the window beyond its intended 
length.
+        if (backoff.armIfNotActive(groupId, currentEpoch)) {
+            response.setTopologyDescriptionRequired(true);
+        }
+        return result;
+    }
+
+    /**
+     * Reject the request synchronously when no plugin is configured or the 
request fails
+     * basic structural validation. Returns the response to send back to the 
client, or
+     * empty when the request is accepted for further processing. The caller 
is expected
+     * to have already short-circuited on a non-active coordinator.
+     */
+    public Optional<StreamsGroupTopologyDescriptionUpdateResponseData> 
preCheckTopologyDescriptionUpdate(
+        StreamsGroupTopologyDescriptionUpdateRequestData request
+    ) {
+        if (plugin.isEmpty()) {
+            return Optional.of(errorResponse(
+                Errors.UNSUPPORTED_VERSION,
+                "The broker has no streams group topology description plugin 
configured."
+            ));
+        }
+        if (request.memberId() == null || request.memberId().isEmpty()) {
+            return Optional.of(errorResponse(Errors.INVALID_REQUEST, "MemberId 
can't be empty."));
+        }
+        if (request.groupId() == null || request.groupId().isEmpty()) {
+            return Optional.of(errorResponse(Errors.INVALID_REQUEST, "GroupId 
can't be empty."));
+        }
+        if (request.topologyDescription() == null) {
+            return Optional.of(errorResponse(Errors.INVALID_REQUEST, 
"TopologyDescription can't be null."));
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Drive the push chain: validate the (group, member), convert the wire 
payload, call
+     * the plugin, persist the outcome, and centralize back-off state 
mutations in a
+     * single {@code whenComplete}.
+     *
+     * <p>The chain carries the terminal disposition through an {@link 
AtomicReference}
+     * holder so {@code whenComplete} can act on it without having to reason 
about the
+     * response shape. Default is {@link BackoffAction#ARM}; any post-plugin 
failure
+     * (including a metadata-record write that fails after a successful plugin 
call)
+     * therefore re-arms the back-off and the next heartbeat re-solicits an 
idempotent
+     * re-push.
+     */
+    public 
CompletableFuture<StreamsGroupTopologyDescriptionUpdateResponseData> 
handleSetTopology(
+        StreamsGroupTopologyDescriptionUpdateRequestData request
+    ) {
+        final String groupId = request.groupId();
+        final String memberId = request.memberId();
+        final int pushedEpoch = request.topologyEpoch();
+        final TopicPartition tp = topicPartitionFor.apply(groupId);
+        final StreamsGroupTopologyDescriptionPlugin p = plugin.get();
+
+        final AtomicReference<BackoffAction> backoffAction = new 
AtomicReference<>(BackoffAction.ARM);
+        return runtime.scheduleReadOperation(
+                "streams-group-topology-description-validate",
+                tp,
+                (coordinator, lastCommittedOffset) -> {
+                    coordinator.validateStreamsGroupMember(groupId, memberId, 
lastCommittedOffset);
+                    return null;
+                })
+            .thenApply(__ -> 
StreamsGroupTopologyDescriptionConverter.fromRequest(request.topologyDescription()))
+            .thenCompose(description -> invokePluginSetTopology(p, groupId, 
pushedEpoch, description))
+            .thenCompose(pluginOutcome -> switch (pluginOutcome.kind()) {
+                case SUCCESS -> runtime.scheduleWriteOperation(
+                    "streams-group-set-stored-topology-epoch",
+                    tp,
+                    coordinator -> 
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, false)
+                ).thenApply(unused -> {
+                    backoffAction.set(BackoffAction.CLEAR);
+                    return new 
StreamsGroupTopologyDescriptionUpdateResponseData();
+                });
+                case PERMANENT -> runtime.scheduleWriteOperation(
+                    "streams-group-set-failed-topology-epoch",
+                    tp,
+                    coordinator -> 
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, true)
+                ).thenApply(unused -> {
+                    backoffAction.set(BackoffAction.CLEAR);
+                    return errorResponse(
+                        Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED, 
pluginOutcome.message());
+                });
+                case TRANSIENT -> 
CompletableFuture.completedFuture(errorResponse(
+                    Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED, 
pluginOutcome.message()));
+            })
+            .whenComplete((response, throwable) -> {
+                if (backoffAction.get() == BackoffAction.CLEAR) {
+                    backoff.clear(groupId);
+                } else {
+                    backoff.armOrExtend(groupId, pushedEpoch);
+                }
+            });

Review Comment:
   Good point — arming the back-off on a routing failure (NOT_COORDINATOR 
etc.), where no group decision was actually made, does leave a stale 
broker-wide entry. This whenComplete is the push path from #22552, so it would 
be fixed there; it also ties into the broader back-off-lifecycle issue (entries 
are only ever cleared from this class).



##########
group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/streams/StreamsGroupTopologyDescription.java:
##########
@@ -58,24 +59,24 @@ public sealed interface Node {
     public record Source(String name, Set<String> topics, Set<String> 
successors) implements Node {
         public Source {
             Objects.requireNonNull(name, "name");
-            topics = Set.copyOf(Objects.requireNonNull(topics, "topics"));
-            successors = Set.copyOf(Objects.requireNonNull(successors, 
"successors"));
+            topics = 
Collections.unmodifiableSet(Objects.requireNonNull(topics, "topics"));
+            successors = 
Collections.unmodifiableSet(Objects.requireNonNull(successors, "successors"));
         }
     }
 
     public record Processor(String name, Set<String> stores, Set<String> 
successors) implements Node {
         public Processor {
             Objects.requireNonNull(name, "name");
-            stores = Set.copyOf(Objects.requireNonNull(stores, "stores"));
-            successors = Set.copyOf(Objects.requireNonNull(successors, 
"successors"));
+            stores = 
Collections.unmodifiableSet(Objects.requireNonNull(stores, "stores"));
+            successors = 
Collections.unmodifiableSet(Objects.requireNonNull(successors, "successors"));
         }
     }
 
     public record Sink(String name, Optional<String> topic, Set<String> 
successors) implements Node {
         public Sink {
             Objects.requireNonNull(name, "name");
             Objects.requireNonNull(topic, "topic");
-            successors = Set.copyOf(Objects.requireNonNull(successors, 
"successors"));
+            successors = 
Collections.unmodifiableSet(Objects.requireNonNull(successors, "successors"));
         }

Review Comment:
   Same as Source/Processor — belongs to the #22552 discussion.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1434,9 +1501,17 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
                     return 
CompletableFuture.completedFuture(deletableGroupResults);
                 }
 
-                return handleDeleteGroups(context, topicPartition, 
retainedGroupIds)
-                    .whenComplete((resp, __) -> resp.forEach(result -> 
deletableGroupResults.add(result.duplicate())))
-                    .thenApply(__ -> deletableGroupResults);
+                return 
topologyDescriptionManager.deleteBeforeGroupDelete(topicPartition, 
retainedGroupIds)

Review Comment:
   The new filterStreamsTopologyErrors + thenCompose runs synchronously after 
deleteBeforeGroupDelete's own exceptionally, and there's no exceptionally 
around this stage (the original comment "deleteShareGroups has its own 
exceptionally block, so we don't need one here" no longer holds for the new 
code). If filterStreamsTopologyErrors throws, the per-partition future 
completes exceptionally and FutureUtils.combineFutures join()s it, failing the 
entire DeleteGroups response including groups on other partitions that 
succeeded. The `new ArrayList<>(groupIds.size() - streamsErrMap.size())` 
capacity (negative -> IllegalArgumentException if that subset invariant ever 
breaks) is a concrete throw path.



##########
group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/streams/StreamsGroupTopologyDescription.java:
##########
@@ -58,24 +59,24 @@ public sealed interface Node {
     public record Source(String name, Set<String> topics, Set<String> 
successors) implements Node {
         public Source {
             Objects.requireNonNull(name, "name");
-            topics = Set.copyOf(Objects.requireNonNull(topics, "topics"));
-            successors = Set.copyOf(Objects.requireNonNull(successors, 
"successors"));
+            topics = 
Collections.unmodifiableSet(Objects.requireNonNull(topics, "topics"));
+            successors = 
Collections.unmodifiableSet(Objects.requireNonNull(successors, "successors"));
         }

Review Comment:
   This POJO change is from #22552 (this PR is stacked on it, so the line shows 
up in the diff here too), and there is already a thread on it there. Let's keep 
the discussion on #22552 rather than duplicating it in 3/3.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyDescriptionManager.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.internals.LogContext;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.GroupCoordinatorShard;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescription;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescriptionPlugin;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsTopologyDescriptionPermanentFailureException;
+
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+/**
+ * Broker-level component that owns everything tied to the streams-group 
topology
+ * description plugin: the configured plugin reference, the per-group
+ * re-solicitation back-off, and the three entry points the group coordinator 
calls
+ * into — heartbeat post-processing, the push RPC, and the {@code 
DeleteGroups} hook.
+ *
+ * <p>This class is broker-level (one instance per {@code 
GroupCoordinatorService}); the
+ * back-off map is keyed by {@code groupId} and shared across all partitions 
hosted on the
+ * broker. State here is intentionally non-timeline and non-replayed: it is 
rebuilt from
+ * scratch on broker restart, and the persisted {@code 
StoredDescriptionTopologyEpoch} /
+ * {@code FailedDescriptionTopologyEpoch} fields on each streams group drive
+ * convergence after a restart.
+ *
+ * <p>Methods that schedule runtime operations require a partition resolver 
supplied by
+ * the caller (typically {@code GroupCoordinatorService::topicPartitionFor}) 
so this
+ * class can stay decoupled from the offsets-topic partition layout.
+ */
+public class TopologyDescriptionManager {
+    private final Logger log;
+    private final Optional<StreamsGroupTopologyDescriptionPlugin> plugin;
+    private final StreamsGroupTopologyDescriptionBackoff backoff;
+    private final CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> 
runtime;
+    private final Function<String, TopicPartition> topicPartitionFor;
+
+    public TopologyDescriptionManager(
+        LogContext logContext,
+        Optional<StreamsGroupTopologyDescriptionPlugin> plugin,
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime,
+        Function<String, TopicPartition> topicPartitionFor,
+        Time time
+    ) {
+        this.log = logContext.logger(TopologyDescriptionManager.class);
+        this.plugin = plugin;
+        this.runtime = runtime;
+        this.topicPartitionFor = topicPartitionFor;
+        this.backoff = new StreamsGroupTopologyDescriptionBackoff(time);
+    }
+
+    /**
+     * @return true if a topology description plugin is configured on this 
broker.
+     */
+    public boolean isPluginConfigured() {
+        return plugin.isPresent();
+    }
+
+    /**
+     * Post-processes a successful streams group heartbeat result by deciding 
whether the
+     * broker should set {@code TopologyDescriptionRequired=true} on the 
response, and
+     * arming the per-group back-off when it does.
+     *
+     * <p>The flag is set when the topology description plugin is configured, 
the group
+     * has resolved to a topology epoch, that epoch is neither stored nor 
permanently
+     * failed at the plugin, no back-off is in effect for this epoch, and the 
response
+     * does not carry a {@code STALE_TOPOLOGY} status (the member would just 
be told to
+     * catch up first). When the response already carries an error code we 
leave it
+     * alone.
+     */
+    public StreamsGroupHeartbeatResult maybeSetTopologyDescriptionRequired(
+        StreamsGroupHeartbeatResult result,
+        String groupId
+    ) {
+        if (plugin.isEmpty()) {
+            return result;
+        }
+        StreamsGroupHeartbeatResponseData response = result.data();
+        if (response.errorCode() != Errors.NONE.code()) {
+            return result;
+        }
+        int currentEpoch = result.currentTopologyEpoch();
+        if (currentEpoch < 0
+            || result.storedDescriptionTopologyEpoch() == currentEpoch
+            || result.failedDescriptionTopologyEpoch() == currentEpoch
+            || responseHasStaleTopology(response)) {
+            return result;
+        }
+        // Atomic check-and-arm: only set the flag if the back-off window is 
not already
+        // in effect for this epoch, so two concurrent heartbeats for the same 
group cannot
+        // both arm the back-off and double the window beyond its intended 
length.
+        if (backoff.armIfNotActive(groupId, currentEpoch)) {
+            response.setTopologyDescriptionRequired(true);
+        }
+        return result;
+    }
+
+    /**
+     * Reject the request synchronously when no plugin is configured or the 
request fails
+     * basic structural validation. Returns the response to send back to the 
client, or
+     * empty when the request is accepted for further processing. The caller 
is expected
+     * to have already short-circuited on a non-active coordinator.
+     */
+    public Optional<StreamsGroupTopologyDescriptionUpdateResponseData> 
preCheckTopologyDescriptionUpdate(
+        StreamsGroupTopologyDescriptionUpdateRequestData request
+    ) {
+        if (plugin.isEmpty()) {
+            return Optional.of(errorResponse(
+                Errors.UNSUPPORTED_VERSION,
+                "The broker has no streams group topology description plugin 
configured."
+            ));
+        }
+        if (request.memberId() == null || request.memberId().isEmpty()) {
+            return Optional.of(errorResponse(Errors.INVALID_REQUEST, "MemberId 
can't be empty."));
+        }
+        if (request.groupId() == null || request.groupId().isEmpty()) {
+            return Optional.of(errorResponse(Errors.INVALID_REQUEST, "GroupId 
can't be empty."));
+        }
+        if (request.topologyDescription() == null) {
+            return Optional.of(errorResponse(Errors.INVALID_REQUEST, 
"TopologyDescription can't be null."));
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Drive the push chain: validate the (group, member), convert the wire 
payload, call
+     * the plugin, persist the outcome, and centralize back-off state 
mutations in a
+     * single {@code whenComplete}.
+     *
+     * <p>The chain carries the terminal disposition through an {@link 
AtomicReference}
+     * holder so {@code whenComplete} can act on it without having to reason 
about the
+     * response shape. Default is {@link BackoffAction#ARM}; any post-plugin 
failure
+     * (including a metadata-record write that fails after a successful plugin 
call)
+     * therefore re-arms the back-off and the next heartbeat re-solicits an 
idempotent
+     * re-push.
+     */
+    public 
CompletableFuture<StreamsGroupTopologyDescriptionUpdateResponseData> 
handleSetTopology(
+        StreamsGroupTopologyDescriptionUpdateRequestData request
+    ) {
+        final String groupId = request.groupId();
+        final String memberId = request.memberId();
+        final int pushedEpoch = request.topologyEpoch();
+        final TopicPartition tp = topicPartitionFor.apply(groupId);
+        final StreamsGroupTopologyDescriptionPlugin p = plugin.get();
+
+        final AtomicReference<BackoffAction> backoffAction = new 
AtomicReference<>(BackoffAction.ARM);
+        return runtime.scheduleReadOperation(
+                "streams-group-topology-description-validate",
+                tp,
+                (coordinator, lastCommittedOffset) -> {
+                    coordinator.validateStreamsGroupMember(groupId, memberId, 
lastCommittedOffset);
+                    return null;
+                })
+            .thenApply(__ -> 
StreamsGroupTopologyDescriptionConverter.fromRequest(request.topologyDescription()))
+            .thenCompose(description -> invokePluginSetTopology(p, groupId, 
pushedEpoch, description))
+            .thenCompose(pluginOutcome -> switch (pluginOutcome.kind()) {
+                case SUCCESS -> runtime.scheduleWriteOperation(
+                    "streams-group-set-stored-topology-epoch",
+                    tp,
+                    coordinator -> 
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, false)
+                ).thenApply(unused -> {
+                    backoffAction.set(BackoffAction.CLEAR);
+                    return new 
StreamsGroupTopologyDescriptionUpdateResponseData();
+                });
+                case PERMANENT -> runtime.scheduleWriteOperation(
+                    "streams-group-set-failed-topology-epoch",
+                    tp,
+                    coordinator -> 
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, true)
+                ).thenApply(unused -> {
+                    backoffAction.set(BackoffAction.CLEAR);
+                    return errorResponse(
+                        Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED, 
pluginOutcome.message());
+                });
+                case TRANSIENT -> 
CompletableFuture.completedFuture(errorResponse(
+                    Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED, 
pluginOutcome.message()));
+            })
+            .whenComplete((response, throwable) -> {
+                if (backoffAction.get() == BackoffAction.CLEAR) {
+                    backoff.clear(groupId);
+                } else {
+                    backoff.armOrExtend(groupId, pushedEpoch);
+                }
+            });
+    }
+
+    /**
+     * Call {@code plugin.deleteTopology} for every supplied group id that is 
a streams
+     * group carrying a stored topology description. Returns a per-group map 
of failures
+     * keyed by group id; groups absent from the map either had no plugin 
state or the
+     * plugin call succeeded. Empty (no-op) when no plugin is configured. Used 
by the
+     * service-level {@code DeleteGroups} flow to gate tombstoning on plugin 
success.
+     */
+    public CompletableFuture<Map<String, ApiError>> deleteBeforeGroupDelete(
+        TopicPartition topicPartition,
+        List<String> groupIds
+    ) {
+        if (plugin.isEmpty()) {
+            return CompletableFuture.completedFuture(Map.of());
+        }
+        final StreamsGroupTopologyDescriptionPlugin p = plugin.get();
+        return runtime.scheduleReadOperation(
+                "streams-group-topology-pre-delete",
+                topicPartition,
+                (coordinator, lastCommittedOffset) ->
+                    
coordinator.streamsGroupsWithStoredTopologyDescription(groupIds, 
lastCommittedOffset))
+            .thenCompose(groupsWithStored -> {
+                if (groupsWithStored.isEmpty()) {
+                    return CompletableFuture.completedFuture(Map.<String, 
ApiError>of());
+                }
+                Map<String, CompletableFuture<Throwable>> calls = new 
HashMap<>(groupsWithStored.size());
+                for (String groupId : groupsWithStored) {
+                    CompletableFuture<Throwable> outcome;
+                    try {
+                        outcome = p.deleteTopology(groupId).handle((unused, 
throwable) -> throwable);
+                    } catch (Throwable t) {
+                        outcome = CompletableFuture.completedFuture(t);
+                    }
+                    calls.put(groupId, outcome);
+                }
+                CompletableFuture<?>[] all = calls.values().toArray(new 
CompletableFuture<?>[0]);
+                return CompletableFuture.allOf(all).thenApply(unused -> {
+                    Map<String, ApiError> failures = new HashMap<>();
+                    calls.forEach((groupId, throwableFuture) -> {
+                        Throwable t = throwableFuture.join();
+                        if (t != null) {
+                            Throwable cause = t instanceof CompletionException 
&& t.getCause() != null

Review Comment:
   Same CompletionException unwrap as in invokePluginSetTopology — this is the 
second occurrence. Errors.maybeUnwrapException(t) already does exactly this 
(and also handles ExecutionException), so both sites can just call it rather 
than hand-rolling the ternary.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyDescriptionManager.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.internals.LogContext;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.GroupCoordinatorShard;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescription;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescriptionPlugin;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsTopologyDescriptionPermanentFailureException;
+
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+/**
+ * Broker-level component that owns everything tied to the streams-group 
topology
+ * description plugin: the configured plugin reference, the per-group
+ * re-solicitation back-off, and the three entry points the group coordinator 
calls
+ * into — heartbeat post-processing, the push RPC, and the {@code 
DeleteGroups} hook.
+ *
+ * <p>This class is broker-level (one instance per {@code 
GroupCoordinatorService}); the
+ * back-off map is keyed by {@code groupId} and shared across all partitions 
hosted on the
+ * broker. State here is intentionally non-timeline and non-replayed: it is 
rebuilt from
+ * scratch on broker restart, and the persisted {@code 
StoredDescriptionTopologyEpoch} /
+ * {@code FailedDescriptionTopologyEpoch} fields on each streams group drive
+ * convergence after a restart.
+ *
+ * <p>Methods that schedule runtime operations require a partition resolver 
supplied by
+ * the caller (typically {@code GroupCoordinatorService::topicPartitionFor}) 
so this
+ * class can stay decoupled from the offsets-topic partition layout.
+ */
+public class TopologyDescriptionManager {
+    private final Logger log;
+    private final Optional<StreamsGroupTopologyDescriptionPlugin> plugin;
+    private final StreamsGroupTopologyDescriptionBackoff backoff;
+    private final CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> 
runtime;
+    private final Function<String, TopicPartition> topicPartitionFor;
+
+    public TopologyDescriptionManager(
+        LogContext logContext,
+        Optional<StreamsGroupTopologyDescriptionPlugin> plugin,
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime,
+        Function<String, TopicPartition> topicPartitionFor,
+        Time time
+    ) {
+        this.log = logContext.logger(TopologyDescriptionManager.class);
+        this.plugin = plugin;
+        this.runtime = runtime;
+        this.topicPartitionFor = topicPartitionFor;
+        this.backoff = new StreamsGroupTopologyDescriptionBackoff(time);
+    }
+
+    /**
+     * @return true if a topology description plugin is configured on this 
broker.
+     */
+    public boolean isPluginConfigured() {
+        return plugin.isPresent();
+    }
+
+    /**
+     * Post-processes a successful streams group heartbeat result by deciding 
whether the
+     * broker should set {@code TopologyDescriptionRequired=true} on the 
response, and
+     * arming the per-group back-off when it does.
+     *
+     * <p>The flag is set when the topology description plugin is configured, 
the group
+     * has resolved to a topology epoch, that epoch is neither stored nor 
permanently
+     * failed at the plugin, no back-off is in effect for this epoch, and the 
response
+     * does not carry a {@code STALE_TOPOLOGY} status (the member would just 
be told to
+     * catch up first). When the response already carries an error code we 
leave it
+     * alone.
+     */
+    public StreamsGroupHeartbeatResult maybeSetTopologyDescriptionRequired(
+        StreamsGroupHeartbeatResult result,
+        String groupId
+    ) {
+        if (plugin.isEmpty()) {
+            return result;
+        }
+        StreamsGroupHeartbeatResponseData response = result.data();
+        if (response.errorCode() != Errors.NONE.code()) {
+            return result;
+        }
+        int currentEpoch = result.currentTopologyEpoch();
+        if (currentEpoch < 0
+            || result.storedDescriptionTopologyEpoch() == currentEpoch
+            || result.failedDescriptionTopologyEpoch() == currentEpoch
+            || responseHasStaleTopology(response)) {
+            return result;
+        }
+        // Atomic check-and-arm: only set the flag if the back-off window is 
not already
+        // in effect for this epoch, so two concurrent heartbeats for the same 
group cannot
+        // both arm the back-off and double the window beyond its intended 
length.
+        if (backoff.armIfNotActive(groupId, currentEpoch)) {
+            response.setTopologyDescriptionRequired(true);
+        }
+        return result;
+    }
+
+    /**
+     * Reject the request synchronously when no plugin is configured or the 
request fails
+     * basic structural validation. Returns the response to send back to the 
client, or
+     * empty when the request is accepted for further processing. The caller 
is expected
+     * to have already short-circuited on a non-active coordinator.
+     */
+    public Optional<StreamsGroupTopologyDescriptionUpdateResponseData> 
preCheckTopologyDescriptionUpdate(
+        StreamsGroupTopologyDescriptionUpdateRequestData request
+    ) {
+        if (plugin.isEmpty()) {
+            return Optional.of(errorResponse(
+                Errors.UNSUPPORTED_VERSION,
+                "The broker has no streams group topology description plugin 
configured."
+            ));
+        }
+        if (request.memberId() == null || request.memberId().isEmpty()) {
+            return Optional.of(errorResponse(Errors.INVALID_REQUEST, "MemberId 
can't be empty."));
+        }
+        if (request.groupId() == null || request.groupId().isEmpty()) {
+            return Optional.of(errorResponse(Errors.INVALID_REQUEST, "GroupId 
can't be empty."));
+        }
+        if (request.topologyDescription() == null) {
+            return Optional.of(errorResponse(Errors.INVALID_REQUEST, 
"TopologyDescription can't be null."));
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Drive the push chain: validate the (group, member), convert the wire 
payload, call
+     * the plugin, persist the outcome, and centralize back-off state 
mutations in a
+     * single {@code whenComplete}.
+     *
+     * <p>The chain carries the terminal disposition through an {@link 
AtomicReference}
+     * holder so {@code whenComplete} can act on it without having to reason 
about the
+     * response shape. Default is {@link BackoffAction#ARM}; any post-plugin 
failure
+     * (including a metadata-record write that fails after a successful plugin 
call)
+     * therefore re-arms the back-off and the next heartbeat re-solicits an 
idempotent
+     * re-push.
+     */
+    public 
CompletableFuture<StreamsGroupTopologyDescriptionUpdateResponseData> 
handleSetTopology(
+        StreamsGroupTopologyDescriptionUpdateRequestData request
+    ) {
+        final String groupId = request.groupId();
+        final String memberId = request.memberId();
+        final int pushedEpoch = request.topologyEpoch();
+        final TopicPartition tp = topicPartitionFor.apply(groupId);
+        final StreamsGroupTopologyDescriptionPlugin p = plugin.get();
+
+        final AtomicReference<BackoffAction> backoffAction = new 
AtomicReference<>(BackoffAction.ARM);
+        return runtime.scheduleReadOperation(
+                "streams-group-topology-description-validate",
+                tp,
+                (coordinator, lastCommittedOffset) -> {
+                    coordinator.validateStreamsGroupMember(groupId, memberId, 
lastCommittedOffset);
+                    return null;
+                })
+            .thenApply(__ -> 
StreamsGroupTopologyDescriptionConverter.fromRequest(request.topologyDescription()))
+            .thenCompose(description -> invokePluginSetTopology(p, groupId, 
pushedEpoch, description))
+            .thenCompose(pluginOutcome -> switch (pluginOutcome.kind()) {
+                case SUCCESS -> runtime.scheduleWriteOperation(
+                    "streams-group-set-stored-topology-epoch",
+                    tp,
+                    coordinator -> 
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, false)
+                ).thenApply(unused -> {
+                    backoffAction.set(BackoffAction.CLEAR);
+                    return new 
StreamsGroupTopologyDescriptionUpdateResponseData();
+                });
+                case PERMANENT -> runtime.scheduleWriteOperation(
+                    "streams-group-set-failed-topology-epoch",
+                    tp,
+                    coordinator -> 
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, true)
+                ).thenApply(unused -> {
+                    backoffAction.set(BackoffAction.CLEAR);
+                    return errorResponse(
+                        Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED, 
pluginOutcome.message());
+                });
+                case TRANSIENT -> 
CompletableFuture.completedFuture(errorResponse(
+                    Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED, 
pluginOutcome.message()));
+            })
+            .whenComplete((response, throwable) -> {
+                if (backoffAction.get() == BackoffAction.CLEAR) {
+                    backoff.clear(groupId);
+                } else {
+                    backoff.armOrExtend(groupId, pushedEpoch);
+                }
+            });
+    }
+
+    /**
+     * Call {@code plugin.deleteTopology} for every supplied group id that is 
a streams
+     * group carrying a stored topology description. Returns a per-group map 
of failures
+     * keyed by group id; groups absent from the map either had no plugin 
state or the
+     * plugin call succeeded. Empty (no-op) when no plugin is configured. Used 
by the
+     * service-level {@code DeleteGroups} flow to gate tombstoning on plugin 
success.
+     */
+    public CompletableFuture<Map<String, ApiError>> deleteBeforeGroupDelete(
+        TopicPartition topicPartition,
+        List<String> groupIds
+    ) {
+        if (plugin.isEmpty()) {
+            return CompletableFuture.completedFuture(Map.of());
+        }
+        final StreamsGroupTopologyDescriptionPlugin p = plugin.get();
+        return runtime.scheduleReadOperation(
+                "streams-group-topology-pre-delete",
+                topicPartition,
+                (coordinator, lastCommittedOffset) ->
+                    
coordinator.streamsGroupsWithStoredTopologyDescription(groupIds, 
lastCommittedOffset))
+            .thenCompose(groupsWithStored -> {
+                if (groupsWithStored.isEmpty()) {
+                    return CompletableFuture.completedFuture(Map.<String, 
ApiError>of());
+                }
+                Map<String, CompletableFuture<Throwable>> calls = new 
HashMap<>(groupsWithStored.size());
+                for (String groupId : groupsWithStored) {
+                    CompletableFuture<Throwable> outcome;
+                    try {
+                        outcome = p.deleteTopology(groupId).handle((unused, 
throwable) -> throwable);
+                    } catch (Throwable t) {
+                        outcome = CompletableFuture.completedFuture(t);
+                    }
+                    calls.put(groupId, outcome);
+                }
+                CompletableFuture<?>[] all = calls.values().toArray(new 
CompletableFuture<?>[0]);
+                return CompletableFuture.allOf(all).thenApply(unused -> {
+                    Map<String, ApiError> failures = new HashMap<>();
+                    calls.forEach((groupId, throwableFuture) -> {
+                        Throwable t = throwableFuture.join();
+                        if (t != null) {
+                            Throwable cause = t instanceof CompletionException 
&& t.getCause() != null
+                                ? t.getCause() : t;
+                            log.warn("Topology description plugin failed to 
delete topology for streams group {}.",
+                                groupId, cause);
+                            backoff.clear(groupId);
+                            failures.put(groupId, new 
ApiError(Errors.GROUP_DELETION_FAILED, cause.getMessage()));
+                        } else {
+                            backoff.clear(groupId);
+                        }
+                    });
+                    return failures;
+                });
+            })
+            .exceptionally(exception -> {
+                // A failure to read the streams group state (NOT_COORDINATOR 
etc.) is treated
+                // the same as a plugin failure: every retained group is held 
back from
+                // tombstoning so the caller can retry. The downstream chain 
prefers the more
+                // specific error from the runtime over GROUP_DELETION_FAILED.
+                Errors error = Errors.forException(exception);
+                Map<String, ApiError> failures = new HashMap<>();
+                for (String groupId : groupIds) {
+                    failures.put(groupId, new ApiError(error, 
exception.getMessage()));
+                }
+                return failures;
+            });
+    }
+
+    // Visible for testing.
+    StreamsGroupTopologyDescriptionBackoff backoff() {
+        return backoff;
+    }
+
+    private static boolean 
responseHasStaleTopology(StreamsGroupHeartbeatResponseData response) {
+        if (response.status() == null) {
+            return false;
+        }
+        byte staleCode = Status.STALE_TOPOLOGY.code();
+        return response.status().stream().anyMatch(s -> s.statusCode() == 
staleCode);
+    }
+
+    private static StreamsGroupTopologyDescriptionUpdateResponseData 
errorResponse(
+        Errors error,
+        String message
+    ) {
+        return new StreamsGroupTopologyDescriptionUpdateResponseData()
+            .setErrorCode(error.code())
+            .setErrorMessage(message);
+    }
+
+    /**
+     * Calls the plugin's {@code setTopology} and folds the result into a 
{@link PluginOutcome}.
+     * The future never completes exceptionally — the outcome carries the 
failure category.
+     */
+    private static CompletableFuture<PluginOutcome> invokePluginSetTopology(
+        StreamsGroupTopologyDescriptionPlugin plugin,
+        String groupId,
+        int pushedEpoch,
+        StreamsGroupTopologyDescription description
+    ) {
+        final CompletableFuture<Void> pluginFuture;
+        try {
+            pluginFuture = plugin.setTopology(groupId, pushedEpoch, 
description);
+        } catch (Throwable t) {
+            // A synchronous throw from the plugin is treated as a permanent 
failure with a
+            // generic client-visible message.
+            return 
CompletableFuture.completedFuture(PluginOutcome.permanent(t.getMessage()));
+        }

Review Comment:
   Agree — and this is already flagged on #22552 where invokePluginSetTopology 
lives. The KIP wants a generic message and t.getMessage() can be null, so we 
should use a fixed string there rather than forwarding it. Will address on 
#22552.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to