lucasbru commented on code in PR #22552:
URL: https://github.com/apache/kafka/pull/22552#discussion_r3418882696


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTopologyDescriptionManager.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.GroupCoordinatorShard;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescription;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescriptionPlugin;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsTopologyDescriptionPermanentFailureException;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+/**
+ * Broker-level component that owns everything tied to the streams-group 
topology
+ * description plugin: the configured plugin reference, the per-group
+ * re-solicitation back-off, and the entry points the group coordinator calls 
into —
+ * heartbeat post-processing and the push RPC. The {@code DeleteGroups} hook 
lands in
+ * a follow-up sub-task.
+ *
+ * <p>This class is broker-level (one instance per {@code 
GroupCoordinatorService}); the
+ * back-off map is keyed by {@code groupId} and shared across all partitions 
hosted on the
+ * broker. State here is intentionally non-timeline and non-replayed: it is 
rebuilt from
+ * scratch on broker restart, and the persisted {@code 
StoredDescriptionTopologyEpoch} /
+ * {@code FailedDescriptionTopologyEpoch} fields on each streams group drive
+ * convergence after a restart.
+ *
+ * <p>Methods that schedule runtime operations require a partition resolver 
supplied by
+ * the caller (typically {@code GroupCoordinatorService::topicPartitionFor}) 
so this
+ * class can stay decoupled from the offsets-topic partition layout.
+ */
+public class StreamsGroupTopologyDescriptionManager implements AutoCloseable {
+    private final Optional<StreamsGroupTopologyDescriptionPlugin> plugin;
+    private final StreamsGroupTopologyDescriptionBackoff backoff;
+    private final CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> 
runtime;
+    private final Function<String, TopicPartition> topicPartitionFor;
+
+    public StreamsGroupTopologyDescriptionManager(
+        Optional<StreamsGroupTopologyDescriptionPlugin> plugin,
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime,
+        Function<String, TopicPartition> topicPartitionFor,
+        Time time
+    ) {
+        this.plugin = plugin;
+        this.runtime = runtime;
+        this.topicPartitionFor = topicPartitionFor;
+        this.backoff = new StreamsGroupTopologyDescriptionBackoff(time);
+    }
+
+    /**
+     * Release plugin-side resources. The plugin is instantiated by the 
service via
+     * {@code config.getConfiguredInstance(...)}, so the service owns it and 
must close
+     * it on shutdown to avoid leaking threads, network clients, etc. across 
broker
+     * restart cycles.
+     */
+    @Override
+    public void close() throws Exception {
+        if (plugin.isPresent()) {
+            plugin.get().close();
+        }
+    }
+
+    /**
+     * @return true if a topology description plugin is configured on this 
broker.
+     */
+    public boolean isPluginConfigured() {
+        return plugin.isPresent();
+    }
+
+    /**
+     * Post-processes a successful streams group heartbeat result by deciding 
whether the
+     * broker should set {@code TopologyDescriptionRequired=true} on the 
response, and
+     * arming the per-group back-off when it does.
+     *
+     * <p>The flag is set when the request is at a version that carries the 
field
+     * ({@code TopologyDescriptionRequired} arrives at v1), the topology 
description plugin
+     * is configured, the group has resolved to a topology epoch, that epoch 
is neither
+     * stored nor permanently failed at the plugin, no back-off is in effect 
for this
+     * epoch, and the response does not carry a {@code STALE_TOPOLOGY} status 
(the member
+     * would just be told to catch up first). When the response already 
carries an error
+     * code we leave it alone.
+     *
+     * <p>The version gate is intentional: a v0 client cannot deserialize the 
flag, so
+     * arming the back-off for it would accumulate entries that grow 
exponentially while
+     * the flag itself gets dropped at serialization — wasting heap on a 
per-group basis
+     * for clients that will never push.
+     */
+    public StreamsGroupHeartbeatResult maybeSetTopologyDescriptionRequired(
+        StreamsGroupHeartbeatResult result,
+        String groupId,
+        int apiVersion
+    ) {
+        if (apiVersion < 1 || plugin.isEmpty()) {
+            return result;
+        }
+        StreamsGroupHeartbeatResponseData response = result.data();
+        if (response.errorCode() != Errors.NONE.code()) {
+            return result;
+        }
+        int currentEpoch = result.currentTopologyEpoch();
+        if (currentEpoch < 0
+            || result.storedDescriptionTopologyEpoch() == currentEpoch
+            || result.failedDescriptionTopologyEpoch() == currentEpoch
+            || responseHasStaleTopology(response)) {
+            return result;
+        }
+        // Atomic check-and-arm: only set the flag if the back-off window is 
not already
+        // in effect for this epoch, so two concurrent heartbeats for the same 
group cannot
+        // both arm the back-off and double the window beyond its intended 
length.
+        if (backoff.armIfNotActive(groupId, currentEpoch)) {
+            response.setTopologyDescriptionRequired(true);
+        }
+        return result;
+    }
+
+    /**
+     * Drive the push chain: validate the (group, member), convert the wire 
payload, call
+     * the plugin, persist the outcome, and centralize back-off state 
mutations in a
+     * single {@code whenComplete}.
+     *
+     * <p>The chain carries the terminal disposition through an {@link 
AtomicReference}
+     * holder so {@code whenComplete} can act on it without having to reason 
about the
+     * response shape. Default is {@link BackoffAction#ARM}; any post-plugin 
failure
+     * (including a metadata-record write that fails after a successful plugin 
call)
+     * therefore re-arms the back-off and the next heartbeat re-solicits an 
idempotent
+     * re-push.
+     */
+    public 
CompletableFuture<StreamsGroupTopologyDescriptionUpdateResponseData> 
handleSetTopology(
+        StreamsGroupTopologyDescriptionUpdateRequestData request
+    ) {
+        final String groupId = request.groupId();
+        final String memberId = request.memberId();
+        final int pushedEpoch = request.topologyEpoch();
+        final TopicPartition tp = topicPartitionFor.apply(groupId);
+        final StreamsGroupTopologyDescriptionPlugin p = plugin.get();
+
+        final AtomicReference<BackoffAction> backoffAction = new 
AtomicReference<>(BackoffAction.ARM);
+        return runtime.scheduleReadOperation(
+                "streams-group-topology-description-validate",
+                tp,
+                (coordinator, lastCommittedOffset) -> {
+                    coordinator.validateStreamsGroupMember(groupId, memberId, 
lastCommittedOffset);
+                    return null;
+                })
+            .thenApply(__ -> 
StreamsGroupTopologyDescriptionConverter.fromRequest(request.topologyDescription()))
+            .thenCompose(description -> invokePluginSetTopology(p, groupId, 
pushedEpoch, description))
+            .thenCompose(pluginOutcome -> switch (pluginOutcome.kind()) {
+                case SUCCESS -> runtime.scheduleWriteOperation(
+                    "streams-group-set-stored-topology-epoch",
+                    tp,
+                    coordinator -> 
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, false)
+                ).thenApply(unused -> {
+                    backoffAction.set(BackoffAction.CLEAR);
+                    return new 
StreamsGroupTopologyDescriptionUpdateResponseData();
+                });
+                case PERMANENT -> runtime.scheduleWriteOperation(
+                    "streams-group-set-failed-topology-epoch",
+                    tp,
+                    coordinator -> 
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, true)
+                ).thenApply(unused -> {
+                    backoffAction.set(BackoffAction.CLEAR);
+                    return errorResponse(
+                        Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED, 
pluginOutcome.message());
+                });
+                case TRANSIENT -> 
CompletableFuture.completedFuture(errorResponse(
+                    Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED, 
pluginOutcome.message()));
+            })
+            .whenComplete((response, throwable) -> {

Review Comment:
   backoffAction defaults to ARM, so a failure *before* the plugin is called — 
validateStreamsGroupMember throwing UNKNOWN_MEMBER_ID/GROUP_ID_NOT_FOUND, or 
fromRequest throwing — still lands here and arms the back-off. A fenced/stale 
member (or, once routing lands in 3/3, an unauthorized caller) pushing at the 
current epoch would then suppress TopologyDescriptionRequired for a legitimate 
member until the window expires. Should only genuine post-plugin transient 
failures arm? Validation/conversion failures probably shouldn't touch the 
back-off at all.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4466,7 +4472,13 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
         if (instanceId == null) {
             StreamsGroupMember member = group.getMemberOrThrow(memberId);
             log.info("[GroupId {}][MemberId {}] Member {} left the streams 
group.", groupId, memberId, memberId);
-            return streamsGroupFenceMember(group, member, new 
StreamsGroupHeartbeatResult(response, Map.of(), group.currentTopologyEpoch()));
+            return streamsGroupFenceMember(group, member, new 
StreamsGroupHeartbeatResult(
+                response,
+                Map.of(),
+                group.currentTopologyEpoch(),

Review Comment:
   This is a leave/fence path: the response is NONE-error with a real 
currentTopologyEpoch, so the heartbeat post-processing will set 
TopologyDescriptionRequired and arm the back-off for a departing member. That 
spends the solicitation window on a member that's gone and delays the push 
request to remaining members. Could we gate solicitation to active heartbeats 
and skip leave/fence?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTopologyDescriptionManager.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.GroupCoordinatorShard;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescription;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescriptionPlugin;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsTopologyDescriptionPermanentFailureException;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+/**
+ * Broker-level component that owns everything tied to the streams-group 
topology
+ * description plugin: the configured plugin reference, the per-group
+ * re-solicitation back-off, and the entry points the group coordinator calls 
into —
+ * heartbeat post-processing and the push RPC. The {@code DeleteGroups} hook 
lands in
+ * a follow-up sub-task.
+ *
+ * <p>This class is broker-level (one instance per {@code 
GroupCoordinatorService}); the
+ * back-off map is keyed by {@code groupId} and shared across all partitions 
hosted on the
+ * broker. State here is intentionally non-timeline and non-replayed: it is 
rebuilt from
+ * scratch on broker restart, and the persisted {@code 
StoredDescriptionTopologyEpoch} /
+ * {@code FailedDescriptionTopologyEpoch} fields on each streams group drive
+ * convergence after a restart.
+ *
+ * <p>Methods that schedule runtime operations require a partition resolver 
supplied by
+ * the caller (typically {@code GroupCoordinatorService::topicPartitionFor}) 
so this
+ * class can stay decoupled from the offsets-topic partition layout.
+ */
+public class StreamsGroupTopologyDescriptionManager implements AutoCloseable {
+    private final Optional<StreamsGroupTopologyDescriptionPlugin> plugin;
+    private final StreamsGroupTopologyDescriptionBackoff backoff;
+    private final CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> 
runtime;
+    private final Function<String, TopicPartition> topicPartitionFor;
+
+    public StreamsGroupTopologyDescriptionManager(
+        Optional<StreamsGroupTopologyDescriptionPlugin> plugin,
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime,
+        Function<String, TopicPartition> topicPartitionFor,
+        Time time
+    ) {
+        this.plugin = plugin;
+        this.runtime = runtime;
+        this.topicPartitionFor = topicPartitionFor;
+        this.backoff = new StreamsGroupTopologyDescriptionBackoff(time);
+    }
+
+    /**
+     * Release plugin-side resources. The plugin is instantiated by the 
service via
+     * {@code config.getConfiguredInstance(...)}, so the service owns it and 
must close
+     * it on shutdown to avoid leaking threads, network clients, etc. across 
broker
+     * restart cycles.
+     */
+    @Override
+    public void close() throws Exception {
+        if (plugin.isPresent()) {
+            plugin.get().close();
+        }
+    }
+
+    /**
+     * @return true if a topology description plugin is configured on this 
broker.
+     */
+    public boolean isPluginConfigured() {
+        return plugin.isPresent();
+    }
+
+    /**
+     * Post-processes a successful streams group heartbeat result by deciding 
whether the
+     * broker should set {@code TopologyDescriptionRequired=true} on the 
response, and
+     * arming the per-group back-off when it does.
+     *
+     * <p>The flag is set when the request is at a version that carries the 
field
+     * ({@code TopologyDescriptionRequired} arrives at v1), the topology 
description plugin
+     * is configured, the group has resolved to a topology epoch, that epoch 
is neither
+     * stored nor permanently failed at the plugin, no back-off is in effect 
for this
+     * epoch, and the response does not carry a {@code STALE_TOPOLOGY} status 
(the member
+     * would just be told to catch up first). When the response already 
carries an error
+     * code we leave it alone.
+     *
+     * <p>The version gate is intentional: a v0 client cannot deserialize the 
flag, so
+     * arming the back-off for it would accumulate entries that grow 
exponentially while
+     * the flag itself gets dropped at serialization — wasting heap on a 
per-group basis
+     * for clients that will never push.
+     */
+    public StreamsGroupHeartbeatResult maybeSetTopologyDescriptionRequired(
+        StreamsGroupHeartbeatResult result,
+        String groupId,
+        int apiVersion
+    ) {
+        if (apiVersion < 1 || plugin.isEmpty()) {
+            return result;
+        }
+        StreamsGroupHeartbeatResponseData response = result.data();
+        if (response.errorCode() != Errors.NONE.code()) {
+            return result;
+        }
+        int currentEpoch = result.currentTopologyEpoch();
+        if (currentEpoch < 0
+            || result.storedDescriptionTopologyEpoch() == currentEpoch
+            || result.failedDescriptionTopologyEpoch() == currentEpoch
+            || responseHasStaleTopology(response)) {
+            return result;
+        }
+        // Atomic check-and-arm: only set the flag if the back-off window is 
not already
+        // in effect for this epoch, so two concurrent heartbeats for the same 
group cannot
+        // both arm the back-off and double the window beyond its intended 
length.
+        if (backoff.armIfNotActive(groupId, currentEpoch)) {
+            response.setTopologyDescriptionRequired(true);
+        }
+        return result;
+    }
+
+    /**
+     * Drive the push chain: validate the (group, member), convert the wire 
payload, call
+     * the plugin, persist the outcome, and centralize back-off state 
mutations in a
+     * single {@code whenComplete}.
+     *
+     * <p>The chain carries the terminal disposition through an {@link 
AtomicReference}
+     * holder so {@code whenComplete} can act on it without having to reason 
about the
+     * response shape. Default is {@link BackoffAction#ARM}; any post-plugin 
failure
+     * (including a metadata-record write that fails after a successful plugin 
call)
+     * therefore re-arms the back-off and the next heartbeat re-solicits an 
idempotent
+     * re-push.
+     */
+    public 
CompletableFuture<StreamsGroupTopologyDescriptionUpdateResponseData> 
handleSetTopology(
+        StreamsGroupTopologyDescriptionUpdateRequestData request
+    ) {
+        final String groupId = request.groupId();
+        final String memberId = request.memberId();
+        final int pushedEpoch = request.topologyEpoch();
+        final TopicPartition tp = topicPartitionFor.apply(groupId);
+        final StreamsGroupTopologyDescriptionPlugin p = plugin.get();
+
+        final AtomicReference<BackoffAction> backoffAction = new 
AtomicReference<>(BackoffAction.ARM);
+        return runtime.scheduleReadOperation(
+                "streams-group-topology-description-validate",
+                tp,
+                (coordinator, lastCommittedOffset) -> {
+                    coordinator.validateStreamsGroupMember(groupId, memberId, 
lastCommittedOffset);
+                    return null;
+                })
+            .thenApply(__ -> 
StreamsGroupTopologyDescriptionConverter.fromRequest(request.topologyDescription()))
+            .thenCompose(description -> invokePluginSetTopology(p, groupId, 
pushedEpoch, description))
+            .thenCompose(pluginOutcome -> switch (pluginOutcome.kind()) {
+                case SUCCESS -> runtime.scheduleWriteOperation(

Review Comment:
   If the group is deleted between the plugin success and this write, 
streamsGroupSetTopologyDescriptionEpoch throws GroupIdNotFoundException, the 
thenApply that sets CLEAR never runs, and whenComplete re-arms the back-off for 
a now-gone group while the client gets UNKNOWN_SERVER_ERROR even though the 
push succeeded. Group-not-found on the post-plugin write probably wants to be a 
clear, not an arm.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTopologyDescriptionManager.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.GroupCoordinatorShard;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescription;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescriptionPlugin;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsTopologyDescriptionPermanentFailureException;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+/**
+ * Broker-level component that owns everything tied to the streams-group 
topology
+ * description plugin: the configured plugin reference, the per-group
+ * re-solicitation back-off, and the entry points the group coordinator calls 
into —
+ * heartbeat post-processing and the push RPC. The {@code DeleteGroups} hook 
lands in
+ * a follow-up sub-task.
+ *
+ * <p>This class is broker-level (one instance per {@code 
GroupCoordinatorService}); the
+ * back-off map is keyed by {@code groupId} and shared across all partitions 
hosted on the
+ * broker. State here is intentionally non-timeline and non-replayed: it is 
rebuilt from
+ * scratch on broker restart, and the persisted {@code 
StoredDescriptionTopologyEpoch} /
+ * {@code FailedDescriptionTopologyEpoch} fields on each streams group drive
+ * convergence after a restart.
+ *
+ * <p>Methods that schedule runtime operations require a partition resolver 
supplied by
+ * the caller (typically {@code GroupCoordinatorService::topicPartitionFor}) 
so this
+ * class can stay decoupled from the offsets-topic partition layout.
+ */
+public class StreamsGroupTopologyDescriptionManager implements AutoCloseable {
+    private final Optional<StreamsGroupTopologyDescriptionPlugin> plugin;
+    private final StreamsGroupTopologyDescriptionBackoff backoff;
+    private final CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> 
runtime;

Review Comment:
   Could we keep runtime and topicPartitionFor out of the manager and drive the 
chain from the service? The service already owns both and orchestrates every 
other RPC's read/write ops, so the manager could shrink to just plugin + 
back-off state:
   
   - constructor becomes `(plugin, time)` (Builder.build() simplifies too)
   - expose `invokeSetTopology(groupId, epoch, description) -> 
CompletableFuture<PluginOutcome>` (the existing invokePluginSetTopology, never 
completes exceptionally) plus `armBackoff`/`clearBackoff`
   - PluginOutcome becomes public; BackoffAction and errorResponse move to the 
service
   
   Then streamsGroupTopologyDescriptionUpdate holds the chain:
   
   ```java
   return runtime.scheduleReadOperation("...validate", tp,
           (c, off) -> { c.validateStreamsGroupMember(groupId, memberId, off); 
return null; })
       .thenApply(__ -> 
StreamsGroupTopologyDescriptionConverter.fromRequest(request.topologyDescription()))
       .thenCompose(desc -> mgr.invokeSetTopology(groupId, pushedEpoch, desc))
       .thenCompose(outcome -> switch (outcome.kind()) {
           case SUCCESS  -> runtime.scheduleWriteOperation("...set-stored...", 
tp, c -> c.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, false))
               .thenApply(u -> { backoffAction.set(CLEAR); return new 
StreamsGroupTopologyDescriptionUpdateResponseData(); });
           case PERMANENT -> runtime.scheduleWriteOperation("...set-failed...", 
tp, c -> c.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, true))
               .thenApply(u -> { backoffAction.set(CLEAR); return 
errorResponse(...); });
           case TRANSIENT -> 
CompletableFuture.completedFuture(errorResponse(...));
       })
       .whenComplete((r, t) -> { if (backoffAction.get() == CLEAR) 
mgr.clearBackoff(groupId); else mgr.armBackoff(groupId, pushedEpoch); })
       .exceptionally(...);
   ```
   
   The existing update tests already go through the service entry point, so 
they'd be unaffected. This is also a natural spot to only arm on a genuine 
plugin transient failure rather than on every pre-plugin failure (the 
validate/convert comment above).



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTopologyDescriptionBackoff.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.utils.Time;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * In-memory per-group back-off that throttles broker re-solicitation of a 
topology
+ * description push. An entry is armed when the broker decides to set
+ * {@code TopologyDescriptionRequired=true} on a heartbeat or after a 
transient plugin
+ * failure; consecutive arms at the same topology epoch double the window from
+ * {@value #INITIAL_DELAY_MS} ms up to {@value #MAX_DELAY_MS} ms. Successful 
pushes,
+ * permanent plugin failures, and topology-epoch advances clear the entry.
+ */
+public class StreamsGroupTopologyDescriptionBackoff {
+
+    static final long INITIAL_DELAY_MS = 30_000L;
+    static final long MAX_DELAY_MS = 3_600_000L;
+
+    private final Time time;
+    private final ConcurrentHashMap<String, Entry> state = new 
ConcurrentHashMap<>();
+
+    record Entry(int topologyEpoch, long currentDelayMs, long nextAttemptMs) { 
}
+
+    public StreamsGroupTopologyDescriptionBackoff(Time time) {
+        this.time = time;
+    }
+
+    /**
+     * @return true if a back-off window is in effect for the given group at 
the given
+     *         topology epoch and the broker should suppress soliciting 
another push.
+     */
+    public boolean isActive(String groupId, int topologyEpoch) {
+        Entry entry = state.get(groupId);
+        return entry != null
+            && entry.topologyEpoch() == topologyEpoch
+            && time.milliseconds() < entry.nextAttemptMs();
+    }
+
+    /**
+     * Atomic check-and-arm. Returns true if no window was in effect and a new 
one was
+     * armed, false if a window was already active and nothing changed. Used 
on the
+     * heartbeat path to fold the "check + arm" pair into a single compute so 
two
+     * concurrent heartbeats for the same group cannot both arm the back-off, 
and to
+     * preserve the exponential chain when the previous window has expired 
without a
+     * push reaching the coordinator (e.g. the client never sent one, or the 
push was
+     * lost in flight before {@link #armOrExtend} could run).
+     */
+    public boolean armIfNotActive(String groupId, int topologyEpoch) {
+        final long now = time.milliseconds();
+        final boolean[] armed = new boolean[]{false};
+        state.compute(groupId, (key, existing) -> {
+            if (existing != null
+                && existing.topologyEpoch() == topologyEpoch
+                && now < existing.nextAttemptMs()) {
+                return existing;
+            }
+            armed[0] = true;
+            // Re-arm: continue the exponential chain at the same epoch (the 
previous
+            // window expired without a push completing); reset to 
INITIAL_DELAY_MS on
+            // an epoch advance, which implicitly drops the prior history.
+            if (existing != null && existing.topologyEpoch() == topologyEpoch) 
{
+                long nextDelay = Math.min(existing.currentDelayMs() * 2, 
MAX_DELAY_MS);

Review Comment:
   This doubling is hand-rolled here and again in armOrExtend, and 
re-implements ExponentialBackoff (used in lots of places in this module). 
Beyond the duplicated curve, we lose jitter — every group armed in the same 
window re-solicits on the same 30/60/120s boundaries, a thundering herd after a 
transient plugin outage. Could store an attempt count in Entry and delegate to 
ExponentialBackoff.backoff(attempt).



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupHeartbeatResult.java:
##########
@@ -26,20 +26,40 @@
 /**
  * A simple record to hold the result of a StreamsGroupHeartbeat request.
  *
- * @param data                  The data to be returned to the client.
- * @param creatableTopics       The internal topics to be created.
- * @param currentTopologyEpoch  The topology epoch the group is operating at 
after this heartbeat, or -1 if the
- *                              group has no topology yet. The service layer 
uses this to decide whether to set
- *                              TopologyDescriptionRequired on the response 
(KIP-1331).
+ * <p>The three epoch fields let the service layer decide, without re-reading 
the group,
+ * whether to set {@code TopologyDescriptionRequired} on the response: a push 
is needed
+ * when the stored epoch lags the current epoch and the same epoch has not 
already been
+ * recorded as a permanent failure. All three are -1 for failure-fast paths 
that do not
+ * resolve a group.
+ *
+ * @param data                              The data to be returned to the 
client.
+ * @param creatableTopics                   The internal topics to be created.
+ * @param currentTopologyEpoch              The topology epoch the group is 
operating at after this heartbeat,
+ *                                          or -1 if the group has no topology 
yet.
+ * @param storedDescriptionTopologyEpoch    The most recent topology epoch 
successfully stored by the topology
+ *                                          description plugin, or -1 if none.
+ * @param failedDescriptionTopologyEpoch    The most recent topology epoch the 
plugin permanently rejected,
+ *                                          or -1 if none.
  */
 public record StreamsGroupHeartbeatResult(
     StreamsGroupHeartbeatResponseData data,
     Map<String, CreatableTopic> creatableTopics,
-    int currentTopologyEpoch
+    int currentTopologyEpoch,

Review Comment:
   These three trailing epochs are all ints, and the 1-arg convenience ctor 
fills them with -1. A future construction site that transposes two of them, or 
uses the convenience ctor on a real-group path, compiles clean and silently 
disables (or wrongly triggers) solicitation. Might be worth an epoch-bundle 
type, or deriving the epochs from the group in GMM, rather than three 
positional ints at each site.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTopologyDescriptionManager.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.GroupCoordinatorShard;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescription;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescriptionPlugin;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsTopologyDescriptionPermanentFailureException;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+/**
+ * Broker-level component that owns everything tied to the streams-group 
topology
+ * description plugin: the configured plugin reference, the per-group
+ * re-solicitation back-off, and the entry points the group coordinator calls 
into —
+ * heartbeat post-processing and the push RPC. The {@code DeleteGroups} hook 
lands in
+ * a follow-up sub-task.
+ *
+ * <p>This class is broker-level (one instance per {@code 
GroupCoordinatorService}); the
+ * back-off map is keyed by {@code groupId} and shared across all partitions 
hosted on the
+ * broker. State here is intentionally non-timeline and non-replayed: it is 
rebuilt from
+ * scratch on broker restart, and the persisted {@code 
StoredDescriptionTopologyEpoch} /
+ * {@code FailedDescriptionTopologyEpoch} fields on each streams group drive
+ * convergence after a restart.
+ *
+ * <p>Methods that schedule runtime operations require a partition resolver 
supplied by
+ * the caller (typically {@code GroupCoordinatorService::topicPartitionFor}) 
so this
+ * class can stay decoupled from the offsets-topic partition layout.
+ */
+public class StreamsGroupTopologyDescriptionManager implements AutoCloseable {
+    private final Optional<StreamsGroupTopologyDescriptionPlugin> plugin;
+    private final StreamsGroupTopologyDescriptionBackoff backoff;
+    private final CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> 
runtime;
+    private final Function<String, TopicPartition> topicPartitionFor;
+
+    public StreamsGroupTopologyDescriptionManager(
+        Optional<StreamsGroupTopologyDescriptionPlugin> plugin,
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime,
+        Function<String, TopicPartition> topicPartitionFor,
+        Time time
+    ) {
+        this.plugin = plugin;
+        this.runtime = runtime;
+        this.topicPartitionFor = topicPartitionFor;
+        this.backoff = new StreamsGroupTopologyDescriptionBackoff(time);
+    }
+
+    /**
+     * Release plugin-side resources. The plugin is instantiated by the 
service via
+     * {@code config.getConfiguredInstance(...)}, so the service owns it and 
must close
+     * it on shutdown to avoid leaking threads, network clients, etc. across 
broker
+     * restart cycles.
+     */
+    @Override
+    public void close() throws Exception {
+        if (plugin.isPresent()) {
+            plugin.get().close();
+        }
+    }
+
+    /**
+     * @return true if a topology description plugin is configured on this 
broker.
+     */
+    public boolean isPluginConfigured() {
+        return plugin.isPresent();
+    }
+
+    /**
+     * Post-processes a successful streams group heartbeat result by deciding 
whether the
+     * broker should set {@code TopologyDescriptionRequired=true} on the 
response, and
+     * arming the per-group back-off when it does.
+     *
+     * <p>The flag is set when the request is at a version that carries the 
field
+     * ({@code TopologyDescriptionRequired} arrives at v1), the topology 
description plugin
+     * is configured, the group has resolved to a topology epoch, that epoch 
is neither
+     * stored nor permanently failed at the plugin, no back-off is in effect 
for this
+     * epoch, and the response does not carry a {@code STALE_TOPOLOGY} status 
(the member
+     * would just be told to catch up first). When the response already 
carries an error
+     * code we leave it alone.
+     *
+     * <p>The version gate is intentional: a v0 client cannot deserialize the 
flag, so
+     * arming the back-off for it would accumulate entries that grow 
exponentially while
+     * the flag itself gets dropped at serialization — wasting heap on a 
per-group basis
+     * for clients that will never push.
+     */
+    public StreamsGroupHeartbeatResult maybeSetTopologyDescriptionRequired(
+        StreamsGroupHeartbeatResult result,
+        String groupId,
+        int apiVersion
+    ) {
+        if (apiVersion < 1 || plugin.isEmpty()) {
+            return result;
+        }
+        StreamsGroupHeartbeatResponseData response = result.data();
+        if (response.errorCode() != Errors.NONE.code()) {
+            return result;
+        }
+        int currentEpoch = result.currentTopologyEpoch();
+        if (currentEpoch < 0
+            || result.storedDescriptionTopologyEpoch() == currentEpoch
+            || result.failedDescriptionTopologyEpoch() == currentEpoch
+            || responseHasStaleTopology(response)) {
+            return result;
+        }
+        // Atomic check-and-arm: only set the flag if the back-off window is 
not already
+        // in effect for this epoch, so two concurrent heartbeats for the same 
group cannot
+        // both arm the back-off and double the window beyond its intended 
length.
+        if (backoff.armIfNotActive(groupId, currentEpoch)) {
+            response.setTopologyDescriptionRequired(true);
+        }
+        return result;
+    }
+
+    /**
+     * Drive the push chain: validate the (group, member), convert the wire 
payload, call
+     * the plugin, persist the outcome, and centralize back-off state 
mutations in a
+     * single {@code whenComplete}.
+     *
+     * <p>The chain carries the terminal disposition through an {@link 
AtomicReference}
+     * holder so {@code whenComplete} can act on it without having to reason 
about the
+     * response shape. Default is {@link BackoffAction#ARM}; any post-plugin 
failure
+     * (including a metadata-record write that fails after a successful plugin 
call)
+     * therefore re-arms the back-off and the next heartbeat re-solicits an 
idempotent
+     * re-push.
+     */
+    public 
CompletableFuture<StreamsGroupTopologyDescriptionUpdateResponseData> 
handleSetTopology(
+        StreamsGroupTopologyDescriptionUpdateRequestData request
+    ) {
+        final String groupId = request.groupId();
+        final String memberId = request.memberId();
+        final int pushedEpoch = request.topologyEpoch();

Review Comment:
   We key and persist everything off request.topologyEpoch() but never check it 
against the group's current/validated epoch (validateStreamsGroupMember only 
checks membership). A stale push at epoch 4 while the group is at 5 still 
writes storedDescriptionTopologyEpoch=4 — which can even regress the stored 
epoch — and the heartbeat gate (stored==current) then never converges and 
re-solicits forever. Worth validating pushedEpoch against the current epoch 
before accepting it?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -633,6 +655,7 @@ public CompletableFuture<StreamsGroupHeartbeatResult> 
streamsGroupHeartbeat(
             "streams-group-heartbeat",
             topicPartitionFor(request.groupId()),
             coordinator -> coordinator.streamsGroupHeartbeat(context, request)
+        ).thenApply(result -> 
streamsGroupTopologyDescriptionManager.maybeSetTopologyDescriptionRequired(result,
 request.groupId(), context.requestVersion())

Review Comment:
   Two small things on this thenApply: (1) it runs before the exceptionally 
below, so if maybeSetTopologyDescriptionRequired ever threw (e.g. a null Status 
element in the stream) it would turn an already-committed successful heartbeat 
into an error response; (2) the lambda + stage are allocated on every heartbeat 
even when no plugin is configured — could gate on isPluginConfigured() at 
entry. Both minor/latent.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTopologyDescriptionBackoff.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.utils.Time;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * In-memory per-group back-off that throttles broker re-solicitation of a 
topology
+ * description push. An entry is armed when the broker decides to set
+ * {@code TopologyDescriptionRequired=true} on a heartbeat or after a 
transient plugin
+ * failure; consecutive arms at the same topology epoch double the window from
+ * {@value #INITIAL_DELAY_MS} ms up to {@value #MAX_DELAY_MS} ms. Successful 
pushes,
+ * permanent plugin failures, and topology-epoch advances clear the entry.
+ */
+public class StreamsGroupTopologyDescriptionBackoff {
+
+    static final long INITIAL_DELAY_MS = 30_000L;
+    static final long MAX_DELAY_MS = 3_600_000L;
+
+    private final Time time;
+    private final ConcurrentHashMap<String, Entry> state = new 
ConcurrentHashMap<>();
+
+    record Entry(int topologyEpoch, long currentDelayMs, long nextAttemptMs) { 
}
+
+    public StreamsGroupTopologyDescriptionBackoff(Time time) {
+        this.time = time;
+    }
+
+    /**
+     * @return true if a back-off window is in effect for the given group at 
the given
+     *         topology epoch and the broker should suppress soliciting 
another push.
+     */
+    public boolean isActive(String groupId, int topologyEpoch) {
+        Entry entry = state.get(groupId);
+        return entry != null
+            && entry.topologyEpoch() == topologyEpoch
+            && time.milliseconds() < entry.nextAttemptMs();
+    }
+
+    /**
+     * Atomic check-and-arm. Returns true if no window was in effect and a new 
one was
+     * armed, false if a window was already active and nothing changed. Used 
on the
+     * heartbeat path to fold the "check + arm" pair into a single compute so 
two
+     * concurrent heartbeats for the same group cannot both arm the back-off, 
and to
+     * preserve the exponential chain when the previous window has expired 
without a
+     * push reaching the coordinator (e.g. the client never sent one, or the 
push was
+     * lost in flight before {@link #armOrExtend} could run).
+     */
+    public boolean armIfNotActive(String groupId, int topologyEpoch) {
+        final long now = time.milliseconds();
+        final boolean[] armed = new boolean[]{false};

Review Comment:
   Nit: the boolean[] armed holder is just to get a result out of compute(); it 
could be derived by comparing the entry compute returns against the prior 
snapshot. Easy to forget to set armed[0] when adding a branch.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTopologyDescriptionManager.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.GroupCoordinatorShard;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescription;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescriptionPlugin;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsTopologyDescriptionPermanentFailureException;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+/**
+ * Broker-level component that owns everything tied to the streams-group 
topology
+ * description plugin: the configured plugin reference, the per-group
+ * re-solicitation back-off, and the entry points the group coordinator calls 
into —
+ * heartbeat post-processing and the push RPC. The {@code DeleteGroups} hook 
lands in
+ * a follow-up sub-task.
+ *
+ * <p>This class is broker-level (one instance per {@code 
GroupCoordinatorService}); the
+ * back-off map is keyed by {@code groupId} and shared across all partitions 
hosted on the
+ * broker. State here is intentionally non-timeline and non-replayed: it is 
rebuilt from
+ * scratch on broker restart, and the persisted {@code 
StoredDescriptionTopologyEpoch} /
+ * {@code FailedDescriptionTopologyEpoch} fields on each streams group drive
+ * convergence after a restart.
+ *
+ * <p>Methods that schedule runtime operations require a partition resolver 
supplied by
+ * the caller (typically {@code GroupCoordinatorService::topicPartitionFor}) 
so this
+ * class can stay decoupled from the offsets-topic partition layout.
+ */
+public class StreamsGroupTopologyDescriptionManager implements AutoCloseable {
+    private final Optional<StreamsGroupTopologyDescriptionPlugin> plugin;
+    private final StreamsGroupTopologyDescriptionBackoff backoff;
+    private final CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> 
runtime;
+    private final Function<String, TopicPartition> topicPartitionFor;
+
+    public StreamsGroupTopologyDescriptionManager(
+        Optional<StreamsGroupTopologyDescriptionPlugin> plugin,
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime,
+        Function<String, TopicPartition> topicPartitionFor,
+        Time time
+    ) {
+        this.plugin = plugin;
+        this.runtime = runtime;
+        this.topicPartitionFor = topicPartitionFor;
+        this.backoff = new StreamsGroupTopologyDescriptionBackoff(time);
+    }
+
+    /**
+     * Release plugin-side resources. The plugin is instantiated by the 
service via
+     * {@code config.getConfiguredInstance(...)}, so the service owns it and 
must close
+     * it on shutdown to avoid leaking threads, network clients, etc. across 
broker
+     * restart cycles.
+     */
+    @Override
+    public void close() throws Exception {
+        if (plugin.isPresent()) {
+            plugin.get().close();
+        }
+    }
+
+    /**
+     * @return true if a topology description plugin is configured on this 
broker.
+     */
+    public boolean isPluginConfigured() {
+        return plugin.isPresent();
+    }
+
+    /**
+     * Post-processes a successful streams group heartbeat result by deciding 
whether the
+     * broker should set {@code TopologyDescriptionRequired=true} on the 
response, and
+     * arming the per-group back-off when it does.
+     *
+     * <p>The flag is set when the request is at a version that carries the 
field
+     * ({@code TopologyDescriptionRequired} arrives at v1), the topology 
description plugin
+     * is configured, the group has resolved to a topology epoch, that epoch 
is neither
+     * stored nor permanently failed at the plugin, no back-off is in effect 
for this
+     * epoch, and the response does not carry a {@code STALE_TOPOLOGY} status 
(the member
+     * would just be told to catch up first). When the response already 
carries an error
+     * code we leave it alone.
+     *
+     * <p>The version gate is intentional: a v0 client cannot deserialize the 
flag, so
+     * arming the back-off for it would accumulate entries that grow 
exponentially while
+     * the flag itself gets dropped at serialization — wasting heap on a 
per-group basis
+     * for clients that will never push.
+     */
+    public StreamsGroupHeartbeatResult maybeSetTopologyDescriptionRequired(
+        StreamsGroupHeartbeatResult result,
+        String groupId,
+        int apiVersion
+    ) {
+        if (apiVersion < 1 || plugin.isEmpty()) {
+            return result;
+        }
+        StreamsGroupHeartbeatResponseData response = result.data();
+        if (response.errorCode() != Errors.NONE.code()) {
+            return result;
+        }
+        int currentEpoch = result.currentTopologyEpoch();
+        if (currentEpoch < 0
+            || result.storedDescriptionTopologyEpoch() == currentEpoch
+            || result.failedDescriptionTopologyEpoch() == currentEpoch
+            || responseHasStaleTopology(response)) {
+            return result;
+        }
+        // Atomic check-and-arm: only set the flag if the back-off window is 
not already
+        // in effect for this epoch, so two concurrent heartbeats for the same 
group cannot
+        // both arm the back-off and double the window beyond its intended 
length.
+        if (backoff.armIfNotActive(groupId, currentEpoch)) {
+            response.setTopologyDescriptionRequired(true);
+        }
+        return result;
+    }
+
+    /**
+     * Drive the push chain: validate the (group, member), convert the wire 
payload, call
+     * the plugin, persist the outcome, and centralize back-off state 
mutations in a
+     * single {@code whenComplete}.
+     *
+     * <p>The chain carries the terminal disposition through an {@link 
AtomicReference}
+     * holder so {@code whenComplete} can act on it without having to reason 
about the
+     * response shape. Default is {@link BackoffAction#ARM}; any post-plugin 
failure
+     * (including a metadata-record write that fails after a successful plugin 
call)
+     * therefore re-arms the back-off and the next heartbeat re-solicits an 
idempotent
+     * re-push.
+     */
+    public 
CompletableFuture<StreamsGroupTopologyDescriptionUpdateResponseData> 
handleSetTopology(
+        StreamsGroupTopologyDescriptionUpdateRequestData request
+    ) {
+        final String groupId = request.groupId();
+        final String memberId = request.memberId();
+        final int pushedEpoch = request.topologyEpoch();
+        final TopicPartition tp = topicPartitionFor.apply(groupId);
+        final StreamsGroupTopologyDescriptionPlugin p = plugin.get();
+
+        final AtomicReference<BackoffAction> backoffAction = new 
AtomicReference<>(BackoffAction.ARM);
+        return runtime.scheduleReadOperation(
+                "streams-group-topology-description-validate",
+                tp,
+                (coordinator, lastCommittedOffset) -> {
+                    coordinator.validateStreamsGroupMember(groupId, memberId, 
lastCommittedOffset);
+                    return null;
+                })
+            .thenApply(__ -> 
StreamsGroupTopologyDescriptionConverter.fromRequest(request.topologyDescription()))
+            .thenCompose(description -> invokePluginSetTopology(p, groupId, 
pushedEpoch, description))
+            .thenCompose(pluginOutcome -> switch (pluginOutcome.kind()) {
+                case SUCCESS -> runtime.scheduleWriteOperation(
+                    "streams-group-set-stored-topology-epoch",
+                    tp,
+                    coordinator -> 
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, false)
+                ).thenApply(unused -> {
+                    backoffAction.set(BackoffAction.CLEAR);
+                    return new 
StreamsGroupTopologyDescriptionUpdateResponseData();
+                });
+                case PERMANENT -> runtime.scheduleWriteOperation(
+                    "streams-group-set-failed-topology-epoch",
+                    tp,
+                    coordinator -> 
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, true)
+                ).thenApply(unused -> {
+                    backoffAction.set(BackoffAction.CLEAR);
+                    return errorResponse(
+                        Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED, 
pluginOutcome.message());
+                });
+                case TRANSIENT -> 
CompletableFuture.completedFuture(errorResponse(
+                    Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED, 
pluginOutcome.message()));
+            })
+            .whenComplete((response, throwable) -> {
+                if (backoffAction.get() == BackoffAction.CLEAR) {
+                    backoff.clear(groupId);
+                } else {
+                    backoff.armOrExtend(groupId, pushedEpoch);

Review Comment:
   Minor: when the heartbeat path already armed at this epoch (30s) and the 
push then fails transiently within that window, armOrExtend doubles it to 60s — 
so the first transient back-off is 60s rather than the 30s initial, because the 
solicit-arm and the fail-arm compound. Probably fine, but worth a comment if 
intended.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTopologyDescriptionManager.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.GroupCoordinatorShard;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescription;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescriptionPlugin;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsTopologyDescriptionPermanentFailureException;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+/**
+ * Broker-level component that owns everything tied to the streams-group 
topology
+ * description plugin: the configured plugin reference, the per-group
+ * re-solicitation back-off, and the entry points the group coordinator calls 
into —
+ * heartbeat post-processing and the push RPC. The {@code DeleteGroups} hook 
lands in
+ * a follow-up sub-task.
+ *
+ * <p>This class is broker-level (one instance per {@code 
GroupCoordinatorService}); the
+ * back-off map is keyed by {@code groupId} and shared across all partitions 
hosted on the
+ * broker. State here is intentionally non-timeline and non-replayed: it is 
rebuilt from
+ * scratch on broker restart, and the persisted {@code 
StoredDescriptionTopologyEpoch} /
+ * {@code FailedDescriptionTopologyEpoch} fields on each streams group drive
+ * convergence after a restart.
+ *
+ * <p>Methods that schedule runtime operations require a partition resolver 
supplied by
+ * the caller (typically {@code GroupCoordinatorService::topicPartitionFor}) 
so this
+ * class can stay decoupled from the offsets-topic partition layout.
+ */
+public class StreamsGroupTopologyDescriptionManager implements AutoCloseable {
+    private final Optional<StreamsGroupTopologyDescriptionPlugin> plugin;
+    private final StreamsGroupTopologyDescriptionBackoff backoff;
+    private final CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> 
runtime;
+    private final Function<String, TopicPartition> topicPartitionFor;
+
+    public StreamsGroupTopologyDescriptionManager(
+        Optional<StreamsGroupTopologyDescriptionPlugin> plugin,
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime,
+        Function<String, TopicPartition> topicPartitionFor,
+        Time time
+    ) {
+        this.plugin = plugin;
+        this.runtime = runtime;
+        this.topicPartitionFor = topicPartitionFor;
+        this.backoff = new StreamsGroupTopologyDescriptionBackoff(time);
+    }
+
+    /**
+     * Release plugin-side resources. The plugin is instantiated by the 
service via
+     * {@code config.getConfiguredInstance(...)}, so the service owns it and 
must close
+     * it on shutdown to avoid leaking threads, network clients, etc. across 
broker
+     * restart cycles.
+     */
+    @Override
+    public void close() throws Exception {
+        if (plugin.isPresent()) {
+            plugin.get().close();
+        }
+    }
+
+    /**
+     * @return true if a topology description plugin is configured on this 
broker.
+     */
+    public boolean isPluginConfigured() {
+        return plugin.isPresent();
+    }
+
+    /**
+     * Post-processes a successful streams group heartbeat result by deciding 
whether the
+     * broker should set {@code TopologyDescriptionRequired=true} on the 
response, and
+     * arming the per-group back-off when it does.
+     *
+     * <p>The flag is set when the request is at a version that carries the 
field
+     * ({@code TopologyDescriptionRequired} arrives at v1), the topology 
description plugin
+     * is configured, the group has resolved to a topology epoch, that epoch 
is neither
+     * stored nor permanently failed at the plugin, no back-off is in effect 
for this
+     * epoch, and the response does not carry a {@code STALE_TOPOLOGY} status 
(the member
+     * would just be told to catch up first). When the response already 
carries an error
+     * code we leave it alone.
+     *
+     * <p>The version gate is intentional: a v0 client cannot deserialize the 
flag, so
+     * arming the back-off for it would accumulate entries that grow 
exponentially while
+     * the flag itself gets dropped at serialization — wasting heap on a 
per-group basis
+     * for clients that will never push.
+     */
+    public StreamsGroupHeartbeatResult maybeSetTopologyDescriptionRequired(
+        StreamsGroupHeartbeatResult result,
+        String groupId,
+        int apiVersion
+    ) {
+        if (apiVersion < 1 || plugin.isEmpty()) {
+            return result;
+        }
+        StreamsGroupHeartbeatResponseData response = result.data();
+        if (response.errorCode() != Errors.NONE.code()) {
+            return result;
+        }
+        int currentEpoch = result.currentTopologyEpoch();
+        if (currentEpoch < 0
+            || result.storedDescriptionTopologyEpoch() == currentEpoch
+            || result.failedDescriptionTopologyEpoch() == currentEpoch
+            || responseHasStaleTopology(response)) {
+            return result;
+        }
+        // Atomic check-and-arm: only set the flag if the back-off window is 
not already
+        // in effect for this epoch, so two concurrent heartbeats for the same 
group cannot
+        // both arm the back-off and double the window beyond its intended 
length.
+        if (backoff.armIfNotActive(groupId, currentEpoch)) {
+            response.setTopologyDescriptionRequired(true);
+        }
+        return result;
+    }
+
+    /**
+     * Drive the push chain: validate the (group, member), convert the wire 
payload, call
+     * the plugin, persist the outcome, and centralize back-off state 
mutations in a
+     * single {@code whenComplete}.
+     *
+     * <p>The chain carries the terminal disposition through an {@link 
AtomicReference}
+     * holder so {@code whenComplete} can act on it without having to reason 
about the
+     * response shape. Default is {@link BackoffAction#ARM}; any post-plugin 
failure
+     * (including a metadata-record write that fails after a successful plugin 
call)
+     * therefore re-arms the back-off and the next heartbeat re-solicits an 
idempotent
+     * re-push.
+     */
+    public 
CompletableFuture<StreamsGroupTopologyDescriptionUpdateResponseData> 
handleSetTopology(
+        StreamsGroupTopologyDescriptionUpdateRequestData request
+    ) {
+        final String groupId = request.groupId();
+        final String memberId = request.memberId();
+        final int pushedEpoch = request.topologyEpoch();
+        final TopicPartition tp = topicPartitionFor.apply(groupId);
+        final StreamsGroupTopologyDescriptionPlugin p = plugin.get();
+
+        final AtomicReference<BackoffAction> backoffAction = new 
AtomicReference<>(BackoffAction.ARM);

Review Comment:
   The AtomicReference<BackoffAction> is effectively a mutable-cell state 
machine seeded to ARM and written inside the two thenApply lambdas — easy for a 
future branch to forget to set, silently defaulting to ARM. Could we thread the 
disposition through the returned future (a small record) instead of a shared 
cell read in whenComplete?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTopologyDescriptionManager.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.GroupCoordinatorShard;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescription;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescriptionPlugin;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsTopologyDescriptionPermanentFailureException;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+/**
+ * Broker-level component that owns everything tied to the streams-group 
topology
+ * description plugin: the configured plugin reference, the per-group
+ * re-solicitation back-off, and the entry points the group coordinator calls 
into —
+ * heartbeat post-processing and the push RPC. The {@code DeleteGroups} hook 
lands in
+ * a follow-up sub-task.
+ *
+ * <p>This class is broker-level (one instance per {@code 
GroupCoordinatorService}); the
+ * back-off map is keyed by {@code groupId} and shared across all partitions 
hosted on the
+ * broker. State here is intentionally non-timeline and non-replayed: it is 
rebuilt from
+ * scratch on broker restart, and the persisted {@code 
StoredDescriptionTopologyEpoch} /
+ * {@code FailedDescriptionTopologyEpoch} fields on each streams group drive
+ * convergence after a restart.
+ *
+ * <p>Methods that schedule runtime operations require a partition resolver 
supplied by
+ * the caller (typically {@code GroupCoordinatorService::topicPartitionFor}) 
so this
+ * class can stay decoupled from the offsets-topic partition layout.
+ */
+public class StreamsGroupTopologyDescriptionManager implements AutoCloseable {
+    private final Optional<StreamsGroupTopologyDescriptionPlugin> plugin;
+    private final StreamsGroupTopologyDescriptionBackoff backoff;
+    private final CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> 
runtime;
+    private final Function<String, TopicPartition> topicPartitionFor;
+
+    public StreamsGroupTopologyDescriptionManager(
+        Optional<StreamsGroupTopologyDescriptionPlugin> plugin,
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime,
+        Function<String, TopicPartition> topicPartitionFor,
+        Time time
+    ) {
+        this.plugin = plugin;
+        this.runtime = runtime;
+        this.topicPartitionFor = topicPartitionFor;
+        this.backoff = new StreamsGroupTopologyDescriptionBackoff(time);
+    }
+
+    /**
+     * Release plugin-side resources. The plugin is instantiated by the 
service via
+     * {@code config.getConfiguredInstance(...)}, so the service owns it and 
must close
+     * it on shutdown to avoid leaking threads, network clients, etc. across 
broker
+     * restart cycles.
+     */
+    @Override
+    public void close() throws Exception {
+        if (plugin.isPresent()) {
+            plugin.get().close();
+        }
+    }
+
+    /**
+     * @return true if a topology description plugin is configured on this 
broker.
+     */
+    public boolean isPluginConfigured() {
+        return plugin.isPresent();
+    }
+
+    /**
+     * Post-processes a successful streams group heartbeat result by deciding 
whether the
+     * broker should set {@code TopologyDescriptionRequired=true} on the 
response, and
+     * arming the per-group back-off when it does.
+     *
+     * <p>The flag is set when the request is at a version that carries the 
field
+     * ({@code TopologyDescriptionRequired} arrives at v1), the topology 
description plugin
+     * is configured, the group has resolved to a topology epoch, that epoch 
is neither
+     * stored nor permanently failed at the plugin, no back-off is in effect 
for this
+     * epoch, and the response does not carry a {@code STALE_TOPOLOGY} status 
(the member
+     * would just be told to catch up first). When the response already 
carries an error
+     * code we leave it alone.
+     *
+     * <p>The version gate is intentional: a v0 client cannot deserialize the 
flag, so
+     * arming the back-off for it would accumulate entries that grow 
exponentially while
+     * the flag itself gets dropped at serialization — wasting heap on a 
per-group basis
+     * for clients that will never push.
+     */
+    public StreamsGroupHeartbeatResult maybeSetTopologyDescriptionRequired(
+        StreamsGroupHeartbeatResult result,
+        String groupId,
+        int apiVersion
+    ) {
+        if (apiVersion < 1 || plugin.isEmpty()) {
+            return result;
+        }
+        StreamsGroupHeartbeatResponseData response = result.data();
+        if (response.errorCode() != Errors.NONE.code()) {
+            return result;
+        }
+        int currentEpoch = result.currentTopologyEpoch();
+        if (currentEpoch < 0
+            || result.storedDescriptionTopologyEpoch() == currentEpoch
+            || result.failedDescriptionTopologyEpoch() == currentEpoch
+            || responseHasStaleTopology(response)) {
+            return result;
+        }
+        // Atomic check-and-arm: only set the flag if the back-off window is 
not already
+        // in effect for this epoch, so two concurrent heartbeats for the same 
group cannot
+        // both arm the back-off and double the window beyond its intended 
length.
+        if (backoff.armIfNotActive(groupId, currentEpoch)) {
+            response.setTopologyDescriptionRequired(true);
+        }
+        return result;
+    }
+
+    /**
+     * Drive the push chain: validate the (group, member), convert the wire 
payload, call
+     * the plugin, persist the outcome, and centralize back-off state 
mutations in a
+     * single {@code whenComplete}.
+     *
+     * <p>The chain carries the terminal disposition through an {@link 
AtomicReference}
+     * holder so {@code whenComplete} can act on it without having to reason 
about the
+     * response shape. Default is {@link BackoffAction#ARM}; any post-plugin 
failure
+     * (including a metadata-record write that fails after a successful plugin 
call)
+     * therefore re-arms the back-off and the next heartbeat re-solicits an 
idempotent
+     * re-push.
+     */
+    public 
CompletableFuture<StreamsGroupTopologyDescriptionUpdateResponseData> 
handleSetTopology(
+        StreamsGroupTopologyDescriptionUpdateRequestData request
+    ) {
+        final String groupId = request.groupId();
+        final String memberId = request.memberId();
+        final int pushedEpoch = request.topologyEpoch();
+        final TopicPartition tp = topicPartitionFor.apply(groupId);
+        final StreamsGroupTopologyDescriptionPlugin p = plugin.get();
+
+        final AtomicReference<BackoffAction> backoffAction = new 
AtomicReference<>(BackoffAction.ARM);
+        return runtime.scheduleReadOperation(
+                "streams-group-topology-description-validate",
+                tp,
+                (coordinator, lastCommittedOffset) -> {
+                    coordinator.validateStreamsGroupMember(groupId, memberId, 
lastCommittedOffset);
+                    return null;
+                })
+            .thenApply(__ -> 
StreamsGroupTopologyDescriptionConverter.fromRequest(request.topologyDescription()))
+            .thenCompose(description -> invokePluginSetTopology(p, groupId, 
pushedEpoch, description))
+            .thenCompose(pluginOutcome -> switch (pluginOutcome.kind()) {

Review Comment:
   The RPC response now completes only after the stored/failed-epoch record 
commits (thenCompose), vs the POC's fire-and-forget write. Arguably more 
correct (we don't tell the client 'stored' until it's durable), but it means a 
slow __consumer_offsets commit blocks the push response and can exceed the 
client request timeout, leading to a re-push even though the plugin call 
already succeeded. Worth sanity-checking against the timeout.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTopologyDescriptionManager.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.GroupCoordinatorShard;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescription;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescriptionPlugin;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsTopologyDescriptionPermanentFailureException;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+/**
+ * Broker-level component that owns everything tied to the streams-group 
topology
+ * description plugin: the configured plugin reference, the per-group
+ * re-solicitation back-off, and the entry points the group coordinator calls 
into —
+ * heartbeat post-processing and the push RPC. The {@code DeleteGroups} hook 
lands in
+ * a follow-up sub-task.
+ *
+ * <p>This class is broker-level (one instance per {@code 
GroupCoordinatorService}); the
+ * back-off map is keyed by {@code groupId} and shared across all partitions 
hosted on the
+ * broker. State here is intentionally non-timeline and non-replayed: it is 
rebuilt from
+ * scratch on broker restart, and the persisted {@code 
StoredDescriptionTopologyEpoch} /
+ * {@code FailedDescriptionTopologyEpoch} fields on each streams group drive
+ * convergence after a restart.
+ *
+ * <p>Methods that schedule runtime operations require a partition resolver 
supplied by
+ * the caller (typically {@code GroupCoordinatorService::topicPartitionFor}) 
so this
+ * class can stay decoupled from the offsets-topic partition layout.
+ */
+public class StreamsGroupTopologyDescriptionManager implements AutoCloseable {
+    private final Optional<StreamsGroupTopologyDescriptionPlugin> plugin;
+    private final StreamsGroupTopologyDescriptionBackoff backoff;
+    private final CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> 
runtime;
+    private final Function<String, TopicPartition> topicPartitionFor;
+
+    public StreamsGroupTopologyDescriptionManager(
+        Optional<StreamsGroupTopologyDescriptionPlugin> plugin,
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime,
+        Function<String, TopicPartition> topicPartitionFor,
+        Time time
+    ) {
+        this.plugin = plugin;
+        this.runtime = runtime;
+        this.topicPartitionFor = topicPartitionFor;
+        this.backoff = new StreamsGroupTopologyDescriptionBackoff(time);
+    }
+
+    /**
+     * Release plugin-side resources. The plugin is instantiated by the 
service via
+     * {@code config.getConfiguredInstance(...)}, so the service owns it and 
must close
+     * it on shutdown to avoid leaking threads, network clients, etc. across 
broker
+     * restart cycles.
+     */
+    @Override
+    public void close() throws Exception {
+        if (plugin.isPresent()) {
+            plugin.get().close();
+        }
+    }
+
+    /**
+     * @return true if a topology description plugin is configured on this 
broker.
+     */
+    public boolean isPluginConfigured() {
+        return plugin.isPresent();
+    }
+
+    /**
+     * Post-processes a successful streams group heartbeat result by deciding 
whether the
+     * broker should set {@code TopologyDescriptionRequired=true} on the 
response, and
+     * arming the per-group back-off when it does.
+     *
+     * <p>The flag is set when the request is at a version that carries the 
field
+     * ({@code TopologyDescriptionRequired} arrives at v1), the topology 
description plugin
+     * is configured, the group has resolved to a topology epoch, that epoch 
is neither
+     * stored nor permanently failed at the plugin, no back-off is in effect 
for this
+     * epoch, and the response does not carry a {@code STALE_TOPOLOGY} status 
(the member
+     * would just be told to catch up first). When the response already 
carries an error
+     * code we leave it alone.
+     *
+     * <p>The version gate is intentional: a v0 client cannot deserialize the 
flag, so
+     * arming the back-off for it would accumulate entries that grow 
exponentially while
+     * the flag itself gets dropped at serialization — wasting heap on a 
per-group basis
+     * for clients that will never push.
+     */
+    public StreamsGroupHeartbeatResult maybeSetTopologyDescriptionRequired(
+        StreamsGroupHeartbeatResult result,
+        String groupId,
+        int apiVersion
+    ) {
+        if (apiVersion < 1 || plugin.isEmpty()) {
+            return result;
+        }
+        StreamsGroupHeartbeatResponseData response = result.data();
+        if (response.errorCode() != Errors.NONE.code()) {
+            return result;
+        }
+        int currentEpoch = result.currentTopologyEpoch();
+        if (currentEpoch < 0
+            || result.storedDescriptionTopologyEpoch() == currentEpoch
+            || result.failedDescriptionTopologyEpoch() == currentEpoch
+            || responseHasStaleTopology(response)) {
+            return result;
+        }
+        // Atomic check-and-arm: only set the flag if the back-off window is 
not already
+        // in effect for this epoch, so two concurrent heartbeats for the same 
group cannot
+        // both arm the back-off and double the window beyond its intended 
length.
+        if (backoff.armIfNotActive(groupId, currentEpoch)) {
+            response.setTopologyDescriptionRequired(true);
+        }
+        return result;
+    }
+
+    /**
+     * Drive the push chain: validate the (group, member), convert the wire 
payload, call
+     * the plugin, persist the outcome, and centralize back-off state 
mutations in a
+     * single {@code whenComplete}.
+     *
+     * <p>The chain carries the terminal disposition through an {@link 
AtomicReference}
+     * holder so {@code whenComplete} can act on it without having to reason 
about the
+     * response shape. Default is {@link BackoffAction#ARM}; any post-plugin 
failure
+     * (including a metadata-record write that fails after a successful plugin 
call)
+     * therefore re-arms the back-off and the next heartbeat re-solicits an 
idempotent
+     * re-push.
+     */
+    public 
CompletableFuture<StreamsGroupTopologyDescriptionUpdateResponseData> 
handleSetTopology(
+        StreamsGroupTopologyDescriptionUpdateRequestData request
+    ) {
+        final String groupId = request.groupId();
+        final String memberId = request.memberId();
+        final int pushedEpoch = request.topologyEpoch();
+        final TopicPartition tp = topicPartitionFor.apply(groupId);
+        final StreamsGroupTopologyDescriptionPlugin p = plugin.get();
+
+        final AtomicReference<BackoffAction> backoffAction = new 
AtomicReference<>(BackoffAction.ARM);
+        return runtime.scheduleReadOperation(
+                "streams-group-topology-description-validate",
+                tp,
+                (coordinator, lastCommittedOffset) -> {
+                    coordinator.validateStreamsGroupMember(groupId, memberId, 
lastCommittedOffset);
+                    return null;
+                })
+            .thenApply(__ -> 
StreamsGroupTopologyDescriptionConverter.fromRequest(request.topologyDescription()))
+            .thenCompose(description -> invokePluginSetTopology(p, groupId, 
pushedEpoch, description))
+            .thenCompose(pluginOutcome -> switch (pluginOutcome.kind()) {
+                case SUCCESS -> runtime.scheduleWriteOperation(
+                    "streams-group-set-stored-topology-epoch",
+                    tp,
+                    coordinator -> 
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, false)
+                ).thenApply(unused -> {
+                    backoffAction.set(BackoffAction.CLEAR);
+                    return new 
StreamsGroupTopologyDescriptionUpdateResponseData();
+                });
+                case PERMANENT -> runtime.scheduleWriteOperation(
+                    "streams-group-set-failed-topology-epoch",
+                    tp,
+                    coordinator -> 
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, true)
+                ).thenApply(unused -> {
+                    backoffAction.set(BackoffAction.CLEAR);
+                    return errorResponse(
+                        Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED, 
pluginOutcome.message());
+                });
+                case TRANSIENT -> 
CompletableFuture.completedFuture(errorResponse(
+                    Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED, 
pluginOutcome.message()));
+            })
+            .whenComplete((response, throwable) -> {
+                if (backoffAction.get() == BackoffAction.CLEAR) {

Review Comment:
   clear(groupId) / armOrExtend(groupId, pushedEpoch) mutate the whole 
per-group entry with no epoch guard, while the read side 
(isActive/armIfNotActive) does check the epoch. If a heartbeat armed a window 
at epoch 6 and a late push for epoch 5 completes, clear() wipes the epoch-6 
window and the next heartbeat re-solicits immediately. Could the mutation side 
compare the epoch too?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTopologyDescriptionManager.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData;
+import 
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.GroupCoordinatorShard;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescription;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescriptionPlugin;
+import 
org.apache.kafka.coordinator.group.api.streams.StreamsTopologyDescriptionPermanentFailureException;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+/**
+ * Broker-level component that owns everything tied to the streams-group 
topology
+ * description plugin: the configured plugin reference, the per-group
+ * re-solicitation back-off, and the entry points the group coordinator calls 
into —
+ * heartbeat post-processing and the push RPC. The {@code DeleteGroups} hook 
lands in
+ * a follow-up sub-task.
+ *
+ * <p>This class is broker-level (one instance per {@code 
GroupCoordinatorService}); the
+ * back-off map is keyed by {@code groupId} and shared across all partitions 
hosted on the
+ * broker. State here is intentionally non-timeline and non-replayed: it is 
rebuilt from
+ * scratch on broker restart, and the persisted {@code 
StoredDescriptionTopologyEpoch} /
+ * {@code FailedDescriptionTopologyEpoch} fields on each streams group drive
+ * convergence after a restart.
+ *
+ * <p>Methods that schedule runtime operations require a partition resolver 
supplied by
+ * the caller (typically {@code GroupCoordinatorService::topicPartitionFor}) 
so this
+ * class can stay decoupled from the offsets-topic partition layout.
+ */
+public class StreamsGroupTopologyDescriptionManager implements AutoCloseable {
+    private final Optional<StreamsGroupTopologyDescriptionPlugin> plugin;
+    private final StreamsGroupTopologyDescriptionBackoff backoff;
+    private final CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> 
runtime;
+    private final Function<String, TopicPartition> topicPartitionFor;
+
+    public StreamsGroupTopologyDescriptionManager(
+        Optional<StreamsGroupTopologyDescriptionPlugin> plugin,
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime,
+        Function<String, TopicPartition> topicPartitionFor,
+        Time time
+    ) {
+        this.plugin = plugin;
+        this.runtime = runtime;
+        this.topicPartitionFor = topicPartitionFor;
+        this.backoff = new StreamsGroupTopologyDescriptionBackoff(time);
+    }
+
+    /**
+     * Release plugin-side resources. The plugin is instantiated by the 
service via
+     * {@code config.getConfiguredInstance(...)}, so the service owns it and 
must close
+     * it on shutdown to avoid leaking threads, network clients, etc. across 
broker
+     * restart cycles.
+     */
+    @Override
+    public void close() throws Exception {
+        if (plugin.isPresent()) {
+            plugin.get().close();
+        }
+    }
+
+    /**
+     * @return true if a topology description plugin is configured on this 
broker.
+     */
+    public boolean isPluginConfigured() {
+        return plugin.isPresent();
+    }
+
+    /**
+     * Post-processes a successful streams group heartbeat result by deciding 
whether the
+     * broker should set {@code TopologyDescriptionRequired=true} on the 
response, and
+     * arming the per-group back-off when it does.
+     *
+     * <p>The flag is set when the request is at a version that carries the 
field
+     * ({@code TopologyDescriptionRequired} arrives at v1), the topology 
description plugin
+     * is configured, the group has resolved to a topology epoch, that epoch 
is neither
+     * stored nor permanently failed at the plugin, no back-off is in effect 
for this
+     * epoch, and the response does not carry a {@code STALE_TOPOLOGY} status 
(the member
+     * would just be told to catch up first). When the response already 
carries an error
+     * code we leave it alone.
+     *
+     * <p>The version gate is intentional: a v0 client cannot deserialize the 
flag, so
+     * arming the back-off for it would accumulate entries that grow 
exponentially while
+     * the flag itself gets dropped at serialization — wasting heap on a 
per-group basis
+     * for clients that will never push.
+     */
+    public StreamsGroupHeartbeatResult maybeSetTopologyDescriptionRequired(
+        StreamsGroupHeartbeatResult result,
+        String groupId,
+        int apiVersion
+    ) {
+        if (apiVersion < 1 || plugin.isEmpty()) {
+            return result;
+        }
+        StreamsGroupHeartbeatResponseData response = result.data();
+        if (response.errorCode() != Errors.NONE.code()) {
+            return result;
+        }
+        int currentEpoch = result.currentTopologyEpoch();
+        if (currentEpoch < 0
+            || result.storedDescriptionTopologyEpoch() == currentEpoch
+            || result.failedDescriptionTopologyEpoch() == currentEpoch
+            || responseHasStaleTopology(response)) {
+            return result;
+        }
+        // Atomic check-and-arm: only set the flag if the back-off window is 
not already
+        // in effect for this epoch, so two concurrent heartbeats for the same 
group cannot
+        // both arm the back-off and double the window beyond its intended 
length.
+        if (backoff.armIfNotActive(groupId, currentEpoch)) {
+            response.setTopologyDescriptionRequired(true);
+        }
+        return result;
+    }
+
+    /**
+     * Drive the push chain: validate the (group, member), convert the wire 
payload, call
+     * the plugin, persist the outcome, and centralize back-off state 
mutations in a
+     * single {@code whenComplete}.
+     *
+     * <p>The chain carries the terminal disposition through an {@link 
AtomicReference}
+     * holder so {@code whenComplete} can act on it without having to reason 
about the
+     * response shape. Default is {@link BackoffAction#ARM}; any post-plugin 
failure
+     * (including a metadata-record write that fails after a successful plugin 
call)
+     * therefore re-arms the back-off and the next heartbeat re-solicits an 
idempotent
+     * re-push.
+     */
+    public 
CompletableFuture<StreamsGroupTopologyDescriptionUpdateResponseData> 
handleSetTopology(
+        StreamsGroupTopologyDescriptionUpdateRequestData request
+    ) {
+        final String groupId = request.groupId();
+        final String memberId = request.memberId();
+        final int pushedEpoch = request.topologyEpoch();
+        final TopicPartition tp = topicPartitionFor.apply(groupId);
+        final StreamsGroupTopologyDescriptionPlugin p = plugin.get();
+
+        final AtomicReference<BackoffAction> backoffAction = new 
AtomicReference<>(BackoffAction.ARM);
+        return runtime.scheduleReadOperation(
+                "streams-group-topology-description-validate",
+                tp,
+                (coordinator, lastCommittedOffset) -> {
+                    coordinator.validateStreamsGroupMember(groupId, memberId, 
lastCommittedOffset);
+                    return null;
+                })
+            .thenApply(__ -> 
StreamsGroupTopologyDescriptionConverter.fromRequest(request.topologyDescription()))
+            .thenCompose(description -> invokePluginSetTopology(p, groupId, 
pushedEpoch, description))
+            .thenCompose(pluginOutcome -> switch (pluginOutcome.kind()) {
+                case SUCCESS -> runtime.scheduleWriteOperation(
+                    "streams-group-set-stored-topology-epoch",
+                    tp,
+                    coordinator -> 
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, false)
+                ).thenApply(unused -> {
+                    backoffAction.set(BackoffAction.CLEAR);
+                    return new 
StreamsGroupTopologyDescriptionUpdateResponseData();
+                });
+                case PERMANENT -> runtime.scheduleWriteOperation(
+                    "streams-group-set-failed-topology-epoch",
+                    tp,
+                    coordinator -> 
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, true)
+                ).thenApply(unused -> {
+                    backoffAction.set(BackoffAction.CLEAR);
+                    return errorResponse(
+                        Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED, 
pluginOutcome.message());
+                });
+                case TRANSIENT -> 
CompletableFuture.completedFuture(errorResponse(
+                    Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED, 
pluginOutcome.message()));
+            })
+            .whenComplete((response, throwable) -> {
+                if (backoffAction.get() == BackoffAction.CLEAR) {
+                    backoff.clear(groupId);
+                } else {
+                    backoff.armOrExtend(groupId, pushedEpoch);
+                }
+            });
+    }
+
+    // Visible for testing.
+    StreamsGroupTopologyDescriptionBackoff backoff() {
+        return backoff;
+    }
+
+    private static boolean 
responseHasStaleTopology(StreamsGroupHeartbeatResponseData response) {
+        if (response.status() == null) {
+            return false;
+        }
+        byte staleCode = Status.STALE_TOPOLOGY.code();
+        return response.status().stream().anyMatch(s -> s.statusCode() == 
staleCode);
+    }
+
+    private static StreamsGroupTopologyDescriptionUpdateResponseData 
errorResponse(
+        Errors error,
+        String message
+    ) {
+        return new StreamsGroupTopologyDescriptionUpdateResponseData()
+            .setErrorCode(error.code())
+            .setErrorMessage(message);
+    }
+
+    /**
+     * Calls the plugin's {@code setTopology} and folds the result into a 
{@link PluginOutcome}.
+     * The future never completes exceptionally — the outcome carries the 
failure category.
+     */
+    private static CompletableFuture<PluginOutcome> invokePluginSetTopology(
+        StreamsGroupTopologyDescriptionPlugin plugin,
+        String groupId,
+        int pushedEpoch,
+        StreamsGroupTopologyDescription description
+    ) {
+        final CompletableFuture<Void> pluginFuture;
+        try {
+            pluginFuture = Objects.requireNonNull(plugin.setTopology(groupId, 
pushedEpoch, description),
+                "Plugin returned null future from setTopology.");
+        } catch (Exception e) {
+            // A synchronous throw or a null future both violate the SPI 
contract —
+            // implementations must signal failures by completing the future 
exceptionally.
+            // Treat either as a permanent failure with a stable, generic 
client-visible
+            // message so we don't forward an unbounded or null exception 
message that
+            // could leak plugin internals, and so a misbehaving plugin 
doesn't NPE the
+            // chain and degenerate into back-off-with-doubling instead of the
+            // permanent-failure handling it should get.
+            return CompletableFuture.completedFuture(
+                PluginOutcome.permanent("Topology description plugin 
failed."));
+        }
+        return pluginFuture.handle((unused, throwable) -> {
+            if (throwable == null) {
+                return PluginOutcome.success();
+            }
+            Throwable cause = Errors.maybeUnwrapException(throwable);

Review Comment:
   Errors.maybeUnwrapException can return null (a Completion/ExecutionException 
with a null cause), and then cause.getMessage() here NPEs. A non-null cause 
with a null message is fine (errorResponse tolerates it) — only the null-cause 
case throws inside handle() and falls through to ARM + UNKNOWN_SERVER_ERROR, 
losing the transient/permanent classification. A null guard would close it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to