lucasbru commented on code in PR #22554:
URL: https://github.com/apache/kafka/pull/22554#discussion_r3413698837
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1495,6 +1570,49 @@ private List<String>
updateResponseAndGetNonErrorGroupList(
return groupSet.stream().toList();
}
+ /**
+ * Move plugin failures into {@code deletableGroupResults} and return the
group ids
+ * that should still proceed to tombstoning.
+ *
+ * <p>Per KIP-1331, {@code GROUP_DELETION_FAILED} and the per-group {@code
ErrorMessage}
+ * field were introduced in DeleteGroups v3. For older clients we
downgrade the error to
+ * {@code UNKNOWN_SERVER_ERROR} with no message — matching the convention
used by KIP-1043
+ * for new error codes that pre-existing request versions cannot
interpret. Errors that
+ * predate this KIP (e.g. NOT_COORDINATOR surfaced from the runtime via
the exceptionally
+ * branch above) are passed through unchanged.
+ */
+ private static List<String> filterStreamsTopologyErrors(
Review Comment:
This method is doing two unrelated things: filtering (partition groupIds
into retained-to-tombstone vs failed) and request-version management (the
GROUP_DELETION_FAILED -> UNKNOWN_SERVER_ERROR + null-message downgrade for
requestVersion < 3). The version handling is the only reason it needs the
requestVersion param.
Suggest splitting them: keep filterStreamsTopologyErrors as a pure,
version-agnostic filter that adds the raw ApiError to deletableGroupResults and
returns retained (drop the requestVersion param), and move the v<3 downgrade up
into KafkaApis.handleDeleteGroupsRequest, where requestVersion is already in
scope and the rest of the DeleteGroups response shaping lives. That's also
where new error codes are version-gated elsewhere (KIP-1043 precedent), and it
keeps the coordinator version-agnostic like the rest of the delete pipeline.
The effectiveMessage = null then drops out on its own: ErrorMessage is
versions 3+ / ignorable, so it's already stripped for old clients at the
serialization layer — only the error code actually needs gating.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyDescriptionManager.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData;
+import
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.internals.LogContext;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.GroupCoordinatorShard;
+import
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescription;
+import
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescriptionPlugin;
+import
org.apache.kafka.coordinator.group.api.streams.StreamsTopologyDescriptionPermanentFailureException;
+
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+/**
+ * Broker-level component that owns everything tied to the streams-group
topology
+ * description plugin: the configured plugin reference, the per-group
+ * re-solicitation back-off, and the three entry points the group coordinator
calls
+ * into — heartbeat post-processing, the push RPC, and the {@code
DeleteGroups} hook.
+ *
+ * <p>This class is broker-level (one instance per {@code
GroupCoordinatorService}); the
+ * back-off map is keyed by {@code groupId} and shared across all partitions
hosted on the
+ * broker. State here is intentionally non-timeline and non-replayed: it is
rebuilt from
+ * scratch on broker restart, and the persisted {@code
StoredDescriptionTopologyEpoch} /
+ * {@code FailedDescriptionTopologyEpoch} fields on each streams group drive
+ * convergence after a restart.
+ *
+ * <p>Methods that schedule runtime operations require a partition resolver
supplied by
+ * the caller (typically {@code GroupCoordinatorService::topicPartitionFor})
so this
+ * class can stay decoupled from the offsets-topic partition layout.
+ */
+public class TopologyDescriptionManager {
+ private final Logger log;
+ private final Optional<StreamsGroupTopologyDescriptionPlugin> plugin;
+ private final StreamsGroupTopologyDescriptionBackoff backoff;
+ private final CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord>
runtime;
+ private final Function<String, TopicPartition> topicPartitionFor;
+
+ public TopologyDescriptionManager(
+ LogContext logContext,
+ Optional<StreamsGroupTopologyDescriptionPlugin> plugin,
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime,
+ Function<String, TopicPartition> topicPartitionFor,
+ Time time
+ ) {
+ this.log = logContext.logger(TopologyDescriptionManager.class);
+ this.plugin = plugin;
+ this.runtime = runtime;
+ this.topicPartitionFor = topicPartitionFor;
+ this.backoff = new StreamsGroupTopologyDescriptionBackoff(time);
+ }
+
+ /**
+ * @return true if a topology description plugin is configured on this
broker.
+ */
+ public boolean isPluginConfigured() {
+ return plugin.isPresent();
+ }
+
+ /**
+ * Post-processes a successful streams group heartbeat result by deciding
whether the
+ * broker should set {@code TopologyDescriptionRequired=true} on the
response, and
+ * arming the per-group back-off when it does.
+ *
+ * <p>The flag is set when the topology description plugin is configured,
the group
+ * has resolved to a topology epoch, that epoch is neither stored nor
permanently
+ * failed at the plugin, no back-off is in effect for this epoch, and the
response
+ * does not carry a {@code STALE_TOPOLOGY} status (the member would just
be told to
+ * catch up first). When the response already carries an error code we
leave it
+ * alone.
+ */
+ public StreamsGroupHeartbeatResult maybeSetTopologyDescriptionRequired(
+ StreamsGroupHeartbeatResult result,
+ String groupId
+ ) {
+ if (plugin.isEmpty()) {
+ return result;
+ }
+ StreamsGroupHeartbeatResponseData response = result.data();
+ if (response.errorCode() != Errors.NONE.code()) {
+ return result;
+ }
+ int currentEpoch = result.currentTopologyEpoch();
+ if (currentEpoch < 0
+ || result.storedDescriptionTopologyEpoch() == currentEpoch
+ || result.failedDescriptionTopologyEpoch() == currentEpoch
+ || responseHasStaleTopology(response)) {
+ return result;
+ }
+ // Atomic check-and-arm: only set the flag if the back-off window is
not already
+ // in effect for this epoch, so two concurrent heartbeats for the same
group cannot
+ // both arm the back-off and double the window beyond its intended
length.
+ if (backoff.armIfNotActive(groupId, currentEpoch)) {
+ response.setTopologyDescriptionRequired(true);
+ }
+ return result;
+ }
+
+ /**
+ * Reject the request synchronously when no plugin is configured or the
request fails
+ * basic structural validation. Returns the response to send back to the
client, or
+ * empty when the request is accepted for further processing. The caller
is expected
+ * to have already short-circuited on a non-active coordinator.
+ */
+ public Optional<StreamsGroupTopologyDescriptionUpdateResponseData>
preCheckTopologyDescriptionUpdate(
+ StreamsGroupTopologyDescriptionUpdateRequestData request
+ ) {
+ if (plugin.isEmpty()) {
+ return Optional.of(errorResponse(
+ Errors.UNSUPPORTED_VERSION,
+ "The broker has no streams group topology description plugin
configured."
+ ));
+ }
+ if (request.memberId() == null || request.memberId().isEmpty()) {
+ return Optional.of(errorResponse(Errors.INVALID_REQUEST, "MemberId
can't be empty."));
+ }
+ if (request.groupId() == null || request.groupId().isEmpty()) {
+ return Optional.of(errorResponse(Errors.INVALID_REQUEST, "GroupId
can't be empty."));
+ }
+ if (request.topologyDescription() == null) {
+ return Optional.of(errorResponse(Errors.INVALID_REQUEST,
"TopologyDescription can't be null."));
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Drive the push chain: validate the (group, member), convert the wire
payload, call
+ * the plugin, persist the outcome, and centralize back-off state
mutations in a
+ * single {@code whenComplete}.
+ *
+ * <p>The chain carries the terminal disposition through an {@link
AtomicReference}
+ * holder so {@code whenComplete} can act on it without having to reason
about the
+ * response shape. Default is {@link BackoffAction#ARM}; any post-plugin
failure
+ * (including a metadata-record write that fails after a successful plugin
call)
+ * therefore re-arms the back-off and the next heartbeat re-solicits an
idempotent
+ * re-push.
+ */
+ public
CompletableFuture<StreamsGroupTopologyDescriptionUpdateResponseData>
handleSetTopology(
+ StreamsGroupTopologyDescriptionUpdateRequestData request
+ ) {
+ final String groupId = request.groupId();
+ final String memberId = request.memberId();
+ final int pushedEpoch = request.topologyEpoch();
+ final TopicPartition tp = topicPartitionFor.apply(groupId);
+ final StreamsGroupTopologyDescriptionPlugin p = plugin.get();
+
+ final AtomicReference<BackoffAction> backoffAction = new
AtomicReference<>(BackoffAction.ARM);
+ return runtime.scheduleReadOperation(
+ "streams-group-topology-description-validate",
+ tp,
+ (coordinator, lastCommittedOffset) -> {
+ coordinator.validateStreamsGroupMember(groupId, memberId,
lastCommittedOffset);
+ return null;
+ })
+ .thenApply(__ ->
StreamsGroupTopologyDescriptionConverter.fromRequest(request.topologyDescription()))
+ .thenCompose(description -> invokePluginSetTopology(p, groupId,
pushedEpoch, description))
+ .thenCompose(pluginOutcome -> switch (pluginOutcome.kind()) {
+ case SUCCESS -> runtime.scheduleWriteOperation(
+ "streams-group-set-stored-topology-epoch",
+ tp,
+ coordinator ->
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, false)
+ ).thenApply(unused -> {
+ backoffAction.set(BackoffAction.CLEAR);
+ return new
StreamsGroupTopologyDescriptionUpdateResponseData();
+ });
+ case PERMANENT -> runtime.scheduleWriteOperation(
+ "streams-group-set-failed-topology-epoch",
+ tp,
+ coordinator ->
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, true)
+ ).thenApply(unused -> {
+ backoffAction.set(BackoffAction.CLEAR);
+ return errorResponse(
+ Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED,
pluginOutcome.message());
+ });
+ case TRANSIENT ->
CompletableFuture.completedFuture(errorResponse(
+ Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED,
pluginOutcome.message()));
+ })
+ .whenComplete((response, throwable) -> {
+ if (backoffAction.get() == BackoffAction.CLEAR) {
+ backoff.clear(groupId);
+ } else {
+ backoff.armOrExtend(groupId, pushedEpoch);
+ }
+ });
+ }
+
+ /**
+ * Call {@code plugin.deleteTopology} for every supplied group id that is
a streams
+ * group carrying a stored topology description. Returns a per-group map
of failures
+ * keyed by group id; groups absent from the map either had no plugin
state or the
+ * plugin call succeeded. Empty (no-op) when no plugin is configured. Used
by the
+ * service-level {@code DeleteGroups} flow to gate tombstoning on plugin
success.
+ */
+ public CompletableFuture<Map<String, ApiError>> deleteBeforeGroupDelete(
+ TopicPartition topicPartition,
+ List<String> groupIds
+ ) {
+ if (plugin.isEmpty()) {
+ return CompletableFuture.completedFuture(Map.of());
+ }
+ final StreamsGroupTopologyDescriptionPlugin p = plugin.get();
+ return runtime.scheduleReadOperation(
+ "streams-group-topology-pre-delete",
+ topicPartition,
+ (coordinator, lastCommittedOffset) ->
+
coordinator.streamsGroupsWithStoredTopologyDescription(groupIds,
lastCommittedOffset))
+ .thenCompose(groupsWithStored -> {
+ if (groupsWithStored.isEmpty()) {
+ return CompletableFuture.completedFuture(Map.<String,
ApiError>of());
+ }
+ Map<String, CompletableFuture<Throwable>> calls = new
HashMap<>(groupsWithStored.size());
+ for (String groupId : groupsWithStored) {
+ CompletableFuture<Throwable> outcome;
+ try {
+ outcome = p.deleteTopology(groupId).handle((unused,
throwable) -> throwable);
+ } catch (Throwable t) {
+ outcome = CompletableFuture.completedFuture(t);
+ }
+ calls.put(groupId, outcome);
+ }
+ CompletableFuture<?>[] all = calls.values().toArray(new
CompletableFuture<?>[0]);
+ return CompletableFuture.allOf(all).thenApply(unused -> {
+ Map<String, ApiError> failures = new HashMap<>();
+ calls.forEach((groupId, throwableFuture) -> {
+ Throwable t = throwableFuture.join();
+ if (t != null) {
+ Throwable cause = t instanceof CompletionException
&& t.getCause() != null
+ ? t.getCause() : t;
+ log.warn("Topology description plugin failed to
delete topology for streams group {}.",
+ groupId, cause);
+ backoff.clear(groupId);
+ failures.put(groupId, new
ApiError(Errors.GROUP_DELETION_FAILED, cause.getMessage()));
+ } else {
+ backoff.clear(groupId);
+ }
+ });
+ return failures;
+ });
+ })
+ .exceptionally(exception -> {
+ // A failure to read the streams group state (NOT_COORDINATOR
etc.) is treated
+ // the same as a plugin failure: every retained group is held
back from
+ // tombstoning so the caller can retry. The downstream chain
prefers the more
+ // specific error from the runtime over GROUP_DELETION_FAILED.
+ Errors error = Errors.forException(exception);
Review Comment:
Two things on the back-off handling here.
(1) Lifecycle: the success/normal path clears the back-off for each group,
but this exceptionally branch (read-op failure) doesn't, so a group with an
armed back-off keeps a stale entry. More generally, the back-off map is only
ever cleared from this class's call sites, so entries leak for any group
removed by a path other than this hook (expiry, partition unload,
tombstone-via-replay) — unbounded growth keyed by dead groupIds over a
long-running broker. Back-off eviction probably wants to hang off the group's
real lifecycle teardown rather than the delete flow.
(2) While here, the gather shape above is heavier than needed: we store
CompletableFuture<Throwable> keyed by groupId (the .handle throws the groupId
away), then re-walk the map and join each to rebuild the failures map. Having
each future produce its own keyed result — do the clear/log/ApiError inside the
.handle and return a Map.entry (or null) per group, then allOf + collect —
keeps the single-threaded aggregation but drops the separate calls map and the
throwaway-then-rebuild of the groupId. That restructuring also naturally moves
backoff.clear into the per-future handle, which fixes the skipped-clear in (1).
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyDescriptionManager.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData;
+import
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.internals.LogContext;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.GroupCoordinatorShard;
+import
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescription;
+import
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescriptionPlugin;
+import
org.apache.kafka.coordinator.group.api.streams.StreamsTopologyDescriptionPermanentFailureException;
+
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+/**
+ * Broker-level component that owns everything tied to the streams-group
topology
+ * description plugin: the configured plugin reference, the per-group
+ * re-solicitation back-off, and the three entry points the group coordinator
calls
+ * into — heartbeat post-processing, the push RPC, and the {@code
DeleteGroups} hook.
+ *
+ * <p>This class is broker-level (one instance per {@code
GroupCoordinatorService}); the
+ * back-off map is keyed by {@code groupId} and shared across all partitions
hosted on the
+ * broker. State here is intentionally non-timeline and non-replayed: it is
rebuilt from
+ * scratch on broker restart, and the persisted {@code
StoredDescriptionTopologyEpoch} /
+ * {@code FailedDescriptionTopologyEpoch} fields on each streams group drive
+ * convergence after a restart.
+ *
+ * <p>Methods that schedule runtime operations require a partition resolver
supplied by
+ * the caller (typically {@code GroupCoordinatorService::topicPartitionFor})
so this
+ * class can stay decoupled from the offsets-topic partition layout.
+ */
+public class TopologyDescriptionManager {
+ private final Logger log;
+ private final Optional<StreamsGroupTopologyDescriptionPlugin> plugin;
+ private final StreamsGroupTopologyDescriptionBackoff backoff;
+ private final CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord>
runtime;
+ private final Function<String, TopicPartition> topicPartitionFor;
+
+ public TopologyDescriptionManager(
+ LogContext logContext,
+ Optional<StreamsGroupTopologyDescriptionPlugin> plugin,
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime,
+ Function<String, TopicPartition> topicPartitionFor,
+ Time time
+ ) {
+ this.log = logContext.logger(TopologyDescriptionManager.class);
+ this.plugin = plugin;
+ this.runtime = runtime;
+ this.topicPartitionFor = topicPartitionFor;
+ this.backoff = new StreamsGroupTopologyDescriptionBackoff(time);
+ }
+
+ /**
+ * @return true if a topology description plugin is configured on this
broker.
+ */
+ public boolean isPluginConfigured() {
+ return plugin.isPresent();
+ }
+
+ /**
+ * Post-processes a successful streams group heartbeat result by deciding
whether the
+ * broker should set {@code TopologyDescriptionRequired=true} on the
response, and
+ * arming the per-group back-off when it does.
+ *
+ * <p>The flag is set when the topology description plugin is configured,
the group
+ * has resolved to a topology epoch, that epoch is neither stored nor
permanently
+ * failed at the plugin, no back-off is in effect for this epoch, and the
response
+ * does not carry a {@code STALE_TOPOLOGY} status (the member would just
be told to
+ * catch up first). When the response already carries an error code we
leave it
+ * alone.
+ */
+ public StreamsGroupHeartbeatResult maybeSetTopologyDescriptionRequired(
+ StreamsGroupHeartbeatResult result,
+ String groupId
+ ) {
+ if (plugin.isEmpty()) {
+ return result;
+ }
+ StreamsGroupHeartbeatResponseData response = result.data();
+ if (response.errorCode() != Errors.NONE.code()) {
+ return result;
+ }
+ int currentEpoch = result.currentTopologyEpoch();
+ if (currentEpoch < 0
+ || result.storedDescriptionTopologyEpoch() == currentEpoch
+ || result.failedDescriptionTopologyEpoch() == currentEpoch
+ || responseHasStaleTopology(response)) {
+ return result;
+ }
+ // Atomic check-and-arm: only set the flag if the back-off window is
not already
+ // in effect for this epoch, so two concurrent heartbeats for the same
group cannot
+ // both arm the back-off and double the window beyond its intended
length.
+ if (backoff.armIfNotActive(groupId, currentEpoch)) {
+ response.setTopologyDescriptionRequired(true);
+ }
+ return result;
+ }
+
+ /**
+ * Reject the request synchronously when no plugin is configured or the
request fails
+ * basic structural validation. Returns the response to send back to the
client, or
+ * empty when the request is accepted for further processing. The caller
is expected
+ * to have already short-circuited on a non-active coordinator.
+ */
+ public Optional<StreamsGroupTopologyDescriptionUpdateResponseData>
preCheckTopologyDescriptionUpdate(
+ StreamsGroupTopologyDescriptionUpdateRequestData request
+ ) {
+ if (plugin.isEmpty()) {
+ return Optional.of(errorResponse(
+ Errors.UNSUPPORTED_VERSION,
+ "The broker has no streams group topology description plugin
configured."
+ ));
+ }
+ if (request.memberId() == null || request.memberId().isEmpty()) {
+ return Optional.of(errorResponse(Errors.INVALID_REQUEST, "MemberId
can't be empty."));
+ }
+ if (request.groupId() == null || request.groupId().isEmpty()) {
+ return Optional.of(errorResponse(Errors.INVALID_REQUEST, "GroupId
can't be empty."));
+ }
+ if (request.topologyDescription() == null) {
+ return Optional.of(errorResponse(Errors.INVALID_REQUEST,
"TopologyDescription can't be null."));
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Drive the push chain: validate the (group, member), convert the wire
payload, call
+ * the plugin, persist the outcome, and centralize back-off state
mutations in a
+ * single {@code whenComplete}.
+ *
+ * <p>The chain carries the terminal disposition through an {@link
AtomicReference}
+ * holder so {@code whenComplete} can act on it without having to reason
about the
+ * response shape. Default is {@link BackoffAction#ARM}; any post-plugin
failure
+ * (including a metadata-record write that fails after a successful plugin
call)
+ * therefore re-arms the back-off and the next heartbeat re-solicits an
idempotent
+ * re-push.
+ */
+ public
CompletableFuture<StreamsGroupTopologyDescriptionUpdateResponseData>
handleSetTopology(
+ StreamsGroupTopologyDescriptionUpdateRequestData request
+ ) {
+ final String groupId = request.groupId();
+ final String memberId = request.memberId();
+ final int pushedEpoch = request.topologyEpoch();
+ final TopicPartition tp = topicPartitionFor.apply(groupId);
+ final StreamsGroupTopologyDescriptionPlugin p = plugin.get();
+
+ final AtomicReference<BackoffAction> backoffAction = new
AtomicReference<>(BackoffAction.ARM);
+ return runtime.scheduleReadOperation(
+ "streams-group-topology-description-validate",
+ tp,
+ (coordinator, lastCommittedOffset) -> {
+ coordinator.validateStreamsGroupMember(groupId, memberId,
lastCommittedOffset);
+ return null;
+ })
+ .thenApply(__ ->
StreamsGroupTopologyDescriptionConverter.fromRequest(request.topologyDescription()))
+ .thenCompose(description -> invokePluginSetTopology(p, groupId,
pushedEpoch, description))
+ .thenCompose(pluginOutcome -> switch (pluginOutcome.kind()) {
+ case SUCCESS -> runtime.scheduleWriteOperation(
+ "streams-group-set-stored-topology-epoch",
+ tp,
+ coordinator ->
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, false)
+ ).thenApply(unused -> {
+ backoffAction.set(BackoffAction.CLEAR);
+ return new
StreamsGroupTopologyDescriptionUpdateResponseData();
+ });
+ case PERMANENT -> runtime.scheduleWriteOperation(
+ "streams-group-set-failed-topology-epoch",
+ tp,
+ coordinator ->
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, true)
+ ).thenApply(unused -> {
+ backoffAction.set(BackoffAction.CLEAR);
+ return errorResponse(
+ Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED,
pluginOutcome.message());
+ });
+ case TRANSIENT ->
CompletableFuture.completedFuture(errorResponse(
+ Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED,
pluginOutcome.message()));
+ })
+ .whenComplete((response, throwable) -> {
+ if (backoffAction.get() == BackoffAction.CLEAR) {
+ backoff.clear(groupId);
+ } else {
+ backoff.armOrExtend(groupId, pushedEpoch);
+ }
+ });
+ }
+
+ /**
+ * Call {@code plugin.deleteTopology} for every supplied group id that is
a streams
+ * group carrying a stored topology description. Returns a per-group map
of failures
+ * keyed by group id; groups absent from the map either had no plugin
state or the
+ * plugin call succeeded. Empty (no-op) when no plugin is configured. Used
by the
+ * service-level {@code DeleteGroups} flow to gate tombstoning on plugin
success.
+ */
+ public CompletableFuture<Map<String, ApiError>> deleteBeforeGroupDelete(
+ TopicPartition topicPartition,
+ List<String> groupIds
+ ) {
+ if (plugin.isEmpty()) {
+ return CompletableFuture.completedFuture(Map.of());
+ }
+ final StreamsGroupTopologyDescriptionPlugin p = plugin.get();
+ return runtime.scheduleReadOperation(
+ "streams-group-topology-pre-delete",
+ topicPartition,
+ (coordinator, lastCommittedOffset) ->
+
coordinator.streamsGroupsWithStoredTopologyDescription(groupIds,
lastCommittedOffset))
+ .thenCompose(groupsWithStored -> {
+ if (groupsWithStored.isEmpty()) {
+ return CompletableFuture.completedFuture(Map.<String,
ApiError>of());
+ }
+ Map<String, CompletableFuture<Throwable>> calls = new
HashMap<>(groupsWithStored.size());
+ for (String groupId : groupsWithStored) {
+ CompletableFuture<Throwable> outcome;
+ try {
+ outcome = p.deleteTopology(groupId).handle((unused,
throwable) -> throwable);
+ } catch (Throwable t) {
+ outcome = CompletableFuture.completedFuture(t);
+ }
+ calls.put(groupId, outcome);
+ }
+ CompletableFuture<?>[] all = calls.values().toArray(new
CompletableFuture<?>[0]);
+ return CompletableFuture.allOf(all).thenApply(unused -> {
+ Map<String, ApiError> failures = new HashMap<>();
+ calls.forEach((groupId, throwableFuture) -> {
+ Throwable t = throwableFuture.join();
+ if (t != null) {
+ Throwable cause = t instanceof CompletionException
&& t.getCause() != null
+ ? t.getCause() : t;
+ log.warn("Topology description plugin failed to
delete topology for streams group {}.",
+ groupId, cause);
+ backoff.clear(groupId);
+ failures.put(groupId, new
ApiError(Errors.GROUP_DELETION_FAILED, cause.getMessage()));
+ } else {
+ backoff.clear(groupId);
+ }
+ });
+ return failures;
+ });
+ })
+ .exceptionally(exception -> {
+ // A failure to read the streams group state (NOT_COORDINATOR
etc.) is treated
+ // the same as a plugin failure: every retained group is held
back from
+ // tombstoning so the caller can retry. The downstream chain
prefers the more
+ // specific error from the runtime over GROUP_DELETION_FAILED.
+ Errors error = Errors.forException(exception);
+ Map<String, ApiError> failures = new HashMap<>();
+ for (String groupId : groupIds) {
+ failures.put(groupId, new ApiError(error,
exception.getMessage()));
Review Comment:
exception here is the CompletionException from the chain, so
exception.getMessage() is the cause's toString() — the client ends up with an
ErrorMessage like "org.apache.kafka.common.errors.NotCoordinatorException:
...". ApiError.fromThrowable(exception) unwraps it and also suppresses the
message for UNKNOWN_SERVER_ERROR, and it's what the sibling DeleteGroups
handlers use.
##########
group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/streams/StreamsGroupTopologyDescription.java:
##########
@@ -58,24 +59,24 @@ public sealed interface Node {
public record Source(String name, Set<String> topics, Set<String>
successors) implements Node {
public Source {
Objects.requireNonNull(name, "name");
- topics = Set.copyOf(Objects.requireNonNull(topics, "topics"));
- successors = Set.copyOf(Objects.requireNonNull(successors,
"successors"));
+ topics =
Collections.unmodifiableSet(Objects.requireNonNull(topics, "topics"));
+ successors =
Collections.unmodifiableSet(Objects.requireNonNull(successors, "successors"));
}
}
public record Processor(String name, Set<String> stores, Set<String>
successors) implements Node {
public Processor {
Objects.requireNonNull(name, "name");
- stores = Set.copyOf(Objects.requireNonNull(stores, "stores"));
- successors = Set.copyOf(Objects.requireNonNull(successors,
"successors"));
+ stores =
Collections.unmodifiableSet(Objects.requireNonNull(stores, "stores"));
+ successors =
Collections.unmodifiableSet(Objects.requireNonNull(successors, "successors"));
}
Review Comment:
Same immutability point as the Source node above, and it is a #22552 change
— let's resolve it on that PR.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyDescriptionManager.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData;
+import
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.internals.LogContext;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.GroupCoordinatorShard;
+import
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescription;
+import
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescriptionPlugin;
+import
org.apache.kafka.coordinator.group.api.streams.StreamsTopologyDescriptionPermanentFailureException;
+
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+/**
+ * Broker-level component that owns everything tied to the streams-group
topology
+ * description plugin: the configured plugin reference, the per-group
+ * re-solicitation back-off, and the three entry points the group coordinator
calls
+ * into — heartbeat post-processing, the push RPC, and the {@code
DeleteGroups} hook.
+ *
+ * <p>This class is broker-level (one instance per {@code
GroupCoordinatorService}); the
+ * back-off map is keyed by {@code groupId} and shared across all partitions
hosted on the
+ * broker. State here is intentionally non-timeline and non-replayed: it is
rebuilt from
+ * scratch on broker restart, and the persisted {@code
StoredDescriptionTopologyEpoch} /
+ * {@code FailedDescriptionTopologyEpoch} fields on each streams group drive
+ * convergence after a restart.
+ *
+ * <p>Methods that schedule runtime operations require a partition resolver
supplied by
+ * the caller (typically {@code GroupCoordinatorService::topicPartitionFor})
so this
+ * class can stay decoupled from the offsets-topic partition layout.
+ */
+public class TopologyDescriptionManager {
+ private final Logger log;
+ private final Optional<StreamsGroupTopologyDescriptionPlugin> plugin;
+ private final StreamsGroupTopologyDescriptionBackoff backoff;
+ private final CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord>
runtime;
+ private final Function<String, TopicPartition> topicPartitionFor;
+
+ public TopologyDescriptionManager(
+ LogContext logContext,
+ Optional<StreamsGroupTopologyDescriptionPlugin> plugin,
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime,
+ Function<String, TopicPartition> topicPartitionFor,
+ Time time
+ ) {
+ this.log = logContext.logger(TopologyDescriptionManager.class);
+ this.plugin = plugin;
+ this.runtime = runtime;
+ this.topicPartitionFor = topicPartitionFor;
+ this.backoff = new StreamsGroupTopologyDescriptionBackoff(time);
+ }
+
+ /**
+ * @return true if a topology description plugin is configured on this
broker.
+ */
+ public boolean isPluginConfigured() {
+ return plugin.isPresent();
+ }
+
+ /**
+ * Post-processes a successful streams group heartbeat result by deciding
whether the
+ * broker should set {@code TopologyDescriptionRequired=true} on the
response, and
+ * arming the per-group back-off when it does.
+ *
+ * <p>The flag is set when the topology description plugin is configured,
the group
+ * has resolved to a topology epoch, that epoch is neither stored nor
permanently
+ * failed at the plugin, no back-off is in effect for this epoch, and the
response
+ * does not carry a {@code STALE_TOPOLOGY} status (the member would just
be told to
+ * catch up first). When the response already carries an error code we
leave it
+ * alone.
+ */
+ public StreamsGroupHeartbeatResult maybeSetTopologyDescriptionRequired(
+ StreamsGroupHeartbeatResult result,
+ String groupId
+ ) {
+ if (plugin.isEmpty()) {
+ return result;
+ }
+ StreamsGroupHeartbeatResponseData response = result.data();
+ if (response.errorCode() != Errors.NONE.code()) {
+ return result;
+ }
+ int currentEpoch = result.currentTopologyEpoch();
+ if (currentEpoch < 0
+ || result.storedDescriptionTopologyEpoch() == currentEpoch
+ || result.failedDescriptionTopologyEpoch() == currentEpoch
+ || responseHasStaleTopology(response)) {
+ return result;
+ }
+ // Atomic check-and-arm: only set the flag if the back-off window is
not already
+ // in effect for this epoch, so two concurrent heartbeats for the same
group cannot
+ // both arm the back-off and double the window beyond its intended
length.
+ if (backoff.armIfNotActive(groupId, currentEpoch)) {
+ response.setTopologyDescriptionRequired(true);
+ }
+ return result;
+ }
+
+ /**
+ * Reject the request synchronously when no plugin is configured or the
request fails
+ * basic structural validation. Returns the response to send back to the
client, or
+ * empty when the request is accepted for further processing. The caller
is expected
+ * to have already short-circuited on a non-active coordinator.
+ */
+ public Optional<StreamsGroupTopologyDescriptionUpdateResponseData>
preCheckTopologyDescriptionUpdate(
+ StreamsGroupTopologyDescriptionUpdateRequestData request
+ ) {
+ if (plugin.isEmpty()) {
+ return Optional.of(errorResponse(
+ Errors.UNSUPPORTED_VERSION,
+ "The broker has no streams group topology description plugin
configured."
+ ));
+ }
+ if (request.memberId() == null || request.memberId().isEmpty()) {
+ return Optional.of(errorResponse(Errors.INVALID_REQUEST, "MemberId
can't be empty."));
+ }
+ if (request.groupId() == null || request.groupId().isEmpty()) {
+ return Optional.of(errorResponse(Errors.INVALID_REQUEST, "GroupId
can't be empty."));
+ }
+ if (request.topologyDescription() == null) {
+ return Optional.of(errorResponse(Errors.INVALID_REQUEST,
"TopologyDescription can't be null."));
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Drive the push chain: validate the (group, member), convert the wire
payload, call
+ * the plugin, persist the outcome, and centralize back-off state
mutations in a
+ * single {@code whenComplete}.
+ *
+ * <p>The chain carries the terminal disposition through an {@link
AtomicReference}
+ * holder so {@code whenComplete} can act on it without having to reason
about the
+ * response shape. Default is {@link BackoffAction#ARM}; any post-plugin
failure
+ * (including a metadata-record write that fails after a successful plugin
call)
+ * therefore re-arms the back-off and the next heartbeat re-solicits an
idempotent
+ * re-push.
+ */
+ public
CompletableFuture<StreamsGroupTopologyDescriptionUpdateResponseData>
handleSetTopology(
+ StreamsGroupTopologyDescriptionUpdateRequestData request
+ ) {
+ final String groupId = request.groupId();
+ final String memberId = request.memberId();
+ final int pushedEpoch = request.topologyEpoch();
+ final TopicPartition tp = topicPartitionFor.apply(groupId);
+ final StreamsGroupTopologyDescriptionPlugin p = plugin.get();
+
+ final AtomicReference<BackoffAction> backoffAction = new
AtomicReference<>(BackoffAction.ARM);
+ return runtime.scheduleReadOperation(
+ "streams-group-topology-description-validate",
+ tp,
+ (coordinator, lastCommittedOffset) -> {
+ coordinator.validateStreamsGroupMember(groupId, memberId,
lastCommittedOffset);
+ return null;
+ })
+ .thenApply(__ ->
StreamsGroupTopologyDescriptionConverter.fromRequest(request.topologyDescription()))
+ .thenCompose(description -> invokePluginSetTopology(p, groupId,
pushedEpoch, description))
+ .thenCompose(pluginOutcome -> switch (pluginOutcome.kind()) {
+ case SUCCESS -> runtime.scheduleWriteOperation(
+ "streams-group-set-stored-topology-epoch",
+ tp,
+ coordinator ->
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, false)
+ ).thenApply(unused -> {
+ backoffAction.set(BackoffAction.CLEAR);
+ return new
StreamsGroupTopologyDescriptionUpdateResponseData();
+ });
+ case PERMANENT -> runtime.scheduleWriteOperation(
+ "streams-group-set-failed-topology-epoch",
+ tp,
+ coordinator ->
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, true)
+ ).thenApply(unused -> {
+ backoffAction.set(BackoffAction.CLEAR);
+ return errorResponse(
+ Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED,
pluginOutcome.message());
+ });
+ case TRANSIENT ->
CompletableFuture.completedFuture(errorResponse(
+ Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED,
pluginOutcome.message()));
+ })
+ .whenComplete((response, throwable) -> {
+ if (backoffAction.get() == BackoffAction.CLEAR) {
+ backoff.clear(groupId);
+ } else {
+ backoff.armOrExtend(groupId, pushedEpoch);
+ }
+ });
Review Comment:
Good point — arming the back-off on a routing failure (NOT_COORDINATOR
etc.), where no group decision was actually made, does leave a stale
broker-wide entry. This whenComplete is the push path from #22552, so it would
be fixed there; it also ties into the broader back-off-lifecycle issue (entries
are only ever cleared from this class).
##########
group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/streams/StreamsGroupTopologyDescription.java:
##########
@@ -58,24 +59,24 @@ public sealed interface Node {
public record Source(String name, Set<String> topics, Set<String>
successors) implements Node {
public Source {
Objects.requireNonNull(name, "name");
- topics = Set.copyOf(Objects.requireNonNull(topics, "topics"));
- successors = Set.copyOf(Objects.requireNonNull(successors,
"successors"));
+ topics =
Collections.unmodifiableSet(Objects.requireNonNull(topics, "topics"));
+ successors =
Collections.unmodifiableSet(Objects.requireNonNull(successors, "successors"));
}
}
public record Processor(String name, Set<String> stores, Set<String>
successors) implements Node {
public Processor {
Objects.requireNonNull(name, "name");
- stores = Set.copyOf(Objects.requireNonNull(stores, "stores"));
- successors = Set.copyOf(Objects.requireNonNull(successors,
"successors"));
+ stores =
Collections.unmodifiableSet(Objects.requireNonNull(stores, "stores"));
+ successors =
Collections.unmodifiableSet(Objects.requireNonNull(successors, "successors"));
}
}
public record Sink(String name, Optional<String> topic, Set<String>
successors) implements Node {
public Sink {
Objects.requireNonNull(name, "name");
Objects.requireNonNull(topic, "topic");
- successors = Set.copyOf(Objects.requireNonNull(successors,
"successors"));
+ successors =
Collections.unmodifiableSet(Objects.requireNonNull(successors, "successors"));
}
Review Comment:
Same as Source/Processor — belongs to the #22552 discussion.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1434,9 +1501,17 @@ public
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
return
CompletableFuture.completedFuture(deletableGroupResults);
}
- return handleDeleteGroups(context, topicPartition,
retainedGroupIds)
- .whenComplete((resp, __) -> resp.forEach(result ->
deletableGroupResults.add(result.duplicate())))
- .thenApply(__ -> deletableGroupResults);
+ return
topologyDescriptionManager.deleteBeforeGroupDelete(topicPartition,
retainedGroupIds)
Review Comment:
The new filterStreamsTopologyErrors + thenCompose runs synchronously after
deleteBeforeGroupDelete's own exceptionally, and there's no exceptionally
around this stage (the original comment "deleteShareGroups has its own
exceptionally block, so we don't need one here" no longer holds for the new
code). If filterStreamsTopologyErrors throws, the per-partition future
completes exceptionally and FutureUtils.combineFutures join()s it, failing the
entire DeleteGroups response including groups on other partitions that
succeeded. The `new ArrayList<>(groupIds.size() - streamsErrMap.size())`
capacity (negative -> IllegalArgumentException if that subset invariant ever
breaks) is a concrete throw path.
##########
group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/streams/StreamsGroupTopologyDescription.java:
##########
@@ -58,24 +59,24 @@ public sealed interface Node {
public record Source(String name, Set<String> topics, Set<String>
successors) implements Node {
public Source {
Objects.requireNonNull(name, "name");
- topics = Set.copyOf(Objects.requireNonNull(topics, "topics"));
- successors = Set.copyOf(Objects.requireNonNull(successors,
"successors"));
+ topics =
Collections.unmodifiableSet(Objects.requireNonNull(topics, "topics"));
+ successors =
Collections.unmodifiableSet(Objects.requireNonNull(successors, "successors"));
}
Review Comment:
This POJO change is from #22552 (this PR is stacked on it, so the line shows
up in the diff here too), and there is already a thread on it there. Let's keep
the discussion on #22552 rather than duplicating it in 3/3.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyDescriptionManager.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData;
+import
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.internals.LogContext;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.GroupCoordinatorShard;
+import
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescription;
+import
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescriptionPlugin;
+import
org.apache.kafka.coordinator.group.api.streams.StreamsTopologyDescriptionPermanentFailureException;
+
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+/**
+ * Broker-level component that owns everything tied to the streams-group
topology
+ * description plugin: the configured plugin reference, the per-group
+ * re-solicitation back-off, and the three entry points the group coordinator
calls
+ * into — heartbeat post-processing, the push RPC, and the {@code
DeleteGroups} hook.
+ *
+ * <p>This class is broker-level (one instance per {@code
GroupCoordinatorService}); the
+ * back-off map is keyed by {@code groupId} and shared across all partitions
hosted on the
+ * broker. State here is intentionally non-timeline and non-replayed: it is
rebuilt from
+ * scratch on broker restart, and the persisted {@code
StoredDescriptionTopologyEpoch} /
+ * {@code FailedDescriptionTopologyEpoch} fields on each streams group drive
+ * convergence after a restart.
+ *
+ * <p>Methods that schedule runtime operations require a partition resolver
supplied by
+ * the caller (typically {@code GroupCoordinatorService::topicPartitionFor})
so this
+ * class can stay decoupled from the offsets-topic partition layout.
+ */
+public class TopologyDescriptionManager {
+ private final Logger log;
+ private final Optional<StreamsGroupTopologyDescriptionPlugin> plugin;
+ private final StreamsGroupTopologyDescriptionBackoff backoff;
+ private final CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord>
runtime;
+ private final Function<String, TopicPartition> topicPartitionFor;
+
+ public TopologyDescriptionManager(
+ LogContext logContext,
+ Optional<StreamsGroupTopologyDescriptionPlugin> plugin,
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime,
+ Function<String, TopicPartition> topicPartitionFor,
+ Time time
+ ) {
+ this.log = logContext.logger(TopologyDescriptionManager.class);
+ this.plugin = plugin;
+ this.runtime = runtime;
+ this.topicPartitionFor = topicPartitionFor;
+ this.backoff = new StreamsGroupTopologyDescriptionBackoff(time);
+ }
+
+ /**
+ * @return true if a topology description plugin is configured on this
broker.
+ */
+ public boolean isPluginConfigured() {
+ return plugin.isPresent();
+ }
+
+ /**
+ * Post-processes a successful streams group heartbeat result by deciding
whether the
+ * broker should set {@code TopologyDescriptionRequired=true} on the
response, and
+ * arming the per-group back-off when it does.
+ *
+ * <p>The flag is set when the topology description plugin is configured,
the group
+ * has resolved to a topology epoch, that epoch is neither stored nor
permanently
+ * failed at the plugin, no back-off is in effect for this epoch, and the
response
+ * does not carry a {@code STALE_TOPOLOGY} status (the member would just
be told to
+ * catch up first). When the response already carries an error code we
leave it
+ * alone.
+ */
+ public StreamsGroupHeartbeatResult maybeSetTopologyDescriptionRequired(
+ StreamsGroupHeartbeatResult result,
+ String groupId
+ ) {
+ if (plugin.isEmpty()) {
+ return result;
+ }
+ StreamsGroupHeartbeatResponseData response = result.data();
+ if (response.errorCode() != Errors.NONE.code()) {
+ return result;
+ }
+ int currentEpoch = result.currentTopologyEpoch();
+ if (currentEpoch < 0
+ || result.storedDescriptionTopologyEpoch() == currentEpoch
+ || result.failedDescriptionTopologyEpoch() == currentEpoch
+ || responseHasStaleTopology(response)) {
+ return result;
+ }
+ // Atomic check-and-arm: only set the flag if the back-off window is
not already
+ // in effect for this epoch, so two concurrent heartbeats for the same
group cannot
+ // both arm the back-off and double the window beyond its intended
length.
+ if (backoff.armIfNotActive(groupId, currentEpoch)) {
+ response.setTopologyDescriptionRequired(true);
+ }
+ return result;
+ }
+
+ /**
+ * Reject the request synchronously when no plugin is configured or the
request fails
+ * basic structural validation. Returns the response to send back to the
client, or
+ * empty when the request is accepted for further processing. The caller
is expected
+ * to have already short-circuited on a non-active coordinator.
+ */
+ public Optional<StreamsGroupTopologyDescriptionUpdateResponseData>
preCheckTopologyDescriptionUpdate(
+ StreamsGroupTopologyDescriptionUpdateRequestData request
+ ) {
+ if (plugin.isEmpty()) {
+ return Optional.of(errorResponse(
+ Errors.UNSUPPORTED_VERSION,
+ "The broker has no streams group topology description plugin
configured."
+ ));
+ }
+ if (request.memberId() == null || request.memberId().isEmpty()) {
+ return Optional.of(errorResponse(Errors.INVALID_REQUEST, "MemberId
can't be empty."));
+ }
+ if (request.groupId() == null || request.groupId().isEmpty()) {
+ return Optional.of(errorResponse(Errors.INVALID_REQUEST, "GroupId
can't be empty."));
+ }
+ if (request.topologyDescription() == null) {
+ return Optional.of(errorResponse(Errors.INVALID_REQUEST,
"TopologyDescription can't be null."));
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Drive the push chain: validate the (group, member), convert the wire
payload, call
+ * the plugin, persist the outcome, and centralize back-off state
mutations in a
+ * single {@code whenComplete}.
+ *
+ * <p>The chain carries the terminal disposition through an {@link
AtomicReference}
+ * holder so {@code whenComplete} can act on it without having to reason
about the
+ * response shape. Default is {@link BackoffAction#ARM}; any post-plugin
failure
+ * (including a metadata-record write that fails after a successful plugin
call)
+ * therefore re-arms the back-off and the next heartbeat re-solicits an
idempotent
+ * re-push.
+ */
+ public
CompletableFuture<StreamsGroupTopologyDescriptionUpdateResponseData>
handleSetTopology(
+ StreamsGroupTopologyDescriptionUpdateRequestData request
+ ) {
+ final String groupId = request.groupId();
+ final String memberId = request.memberId();
+ final int pushedEpoch = request.topologyEpoch();
+ final TopicPartition tp = topicPartitionFor.apply(groupId);
+ final StreamsGroupTopologyDescriptionPlugin p = plugin.get();
+
+ final AtomicReference<BackoffAction> backoffAction = new
AtomicReference<>(BackoffAction.ARM);
+ return runtime.scheduleReadOperation(
+ "streams-group-topology-description-validate",
+ tp,
+ (coordinator, lastCommittedOffset) -> {
+ coordinator.validateStreamsGroupMember(groupId, memberId,
lastCommittedOffset);
+ return null;
+ })
+ .thenApply(__ ->
StreamsGroupTopologyDescriptionConverter.fromRequest(request.topologyDescription()))
+ .thenCompose(description -> invokePluginSetTopology(p, groupId,
pushedEpoch, description))
+ .thenCompose(pluginOutcome -> switch (pluginOutcome.kind()) {
+ case SUCCESS -> runtime.scheduleWriteOperation(
+ "streams-group-set-stored-topology-epoch",
+ tp,
+ coordinator ->
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, false)
+ ).thenApply(unused -> {
+ backoffAction.set(BackoffAction.CLEAR);
+ return new
StreamsGroupTopologyDescriptionUpdateResponseData();
+ });
+ case PERMANENT -> runtime.scheduleWriteOperation(
+ "streams-group-set-failed-topology-epoch",
+ tp,
+ coordinator ->
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, true)
+ ).thenApply(unused -> {
+ backoffAction.set(BackoffAction.CLEAR);
+ return errorResponse(
+ Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED,
pluginOutcome.message());
+ });
+ case TRANSIENT ->
CompletableFuture.completedFuture(errorResponse(
+ Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED,
pluginOutcome.message()));
+ })
+ .whenComplete((response, throwable) -> {
+ if (backoffAction.get() == BackoffAction.CLEAR) {
+ backoff.clear(groupId);
+ } else {
+ backoff.armOrExtend(groupId, pushedEpoch);
+ }
+ });
+ }
+
+ /**
+ * Call {@code plugin.deleteTopology} for every supplied group id that is
a streams
+ * group carrying a stored topology description. Returns a per-group map
of failures
+ * keyed by group id; groups absent from the map either had no plugin
state or the
+ * plugin call succeeded. Empty (no-op) when no plugin is configured. Used
by the
+ * service-level {@code DeleteGroups} flow to gate tombstoning on plugin
success.
+ */
+ public CompletableFuture<Map<String, ApiError>> deleteBeforeGroupDelete(
+ TopicPartition topicPartition,
+ List<String> groupIds
+ ) {
+ if (plugin.isEmpty()) {
+ return CompletableFuture.completedFuture(Map.of());
+ }
+ final StreamsGroupTopologyDescriptionPlugin p = plugin.get();
+ return runtime.scheduleReadOperation(
+ "streams-group-topology-pre-delete",
+ topicPartition,
+ (coordinator, lastCommittedOffset) ->
+
coordinator.streamsGroupsWithStoredTopologyDescription(groupIds,
lastCommittedOffset))
+ .thenCompose(groupsWithStored -> {
+ if (groupsWithStored.isEmpty()) {
+ return CompletableFuture.completedFuture(Map.<String,
ApiError>of());
+ }
+ Map<String, CompletableFuture<Throwable>> calls = new
HashMap<>(groupsWithStored.size());
+ for (String groupId : groupsWithStored) {
+ CompletableFuture<Throwable> outcome;
+ try {
+ outcome = p.deleteTopology(groupId).handle((unused,
throwable) -> throwable);
+ } catch (Throwable t) {
+ outcome = CompletableFuture.completedFuture(t);
+ }
+ calls.put(groupId, outcome);
+ }
+ CompletableFuture<?>[] all = calls.values().toArray(new
CompletableFuture<?>[0]);
+ return CompletableFuture.allOf(all).thenApply(unused -> {
+ Map<String, ApiError> failures = new HashMap<>();
+ calls.forEach((groupId, throwableFuture) -> {
+ Throwable t = throwableFuture.join();
+ if (t != null) {
+ Throwable cause = t instanceof CompletionException
&& t.getCause() != null
Review Comment:
Same CompletionException unwrap as in invokePluginSetTopology — this is the
second occurrence. Errors.maybeUnwrapException(t) already does exactly this
(and also handles ExecutionException), so both sites can just call it rather
than hand-rolling the ternary.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyDescriptionManager.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData;
+import
org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.internals.LogContext;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.group.GroupCoordinatorShard;
+import
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescription;
+import
org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescriptionPlugin;
+import
org.apache.kafka.coordinator.group.api.streams.StreamsTopologyDescriptionPermanentFailureException;
+
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+/**
+ * Broker-level component that owns everything tied to the streams-group
topology
+ * description plugin: the configured plugin reference, the per-group
+ * re-solicitation back-off, and the three entry points the group coordinator
calls
+ * into — heartbeat post-processing, the push RPC, and the {@code
DeleteGroups} hook.
+ *
+ * <p>This class is broker-level (one instance per {@code
GroupCoordinatorService}); the
+ * back-off map is keyed by {@code groupId} and shared across all partitions
hosted on the
+ * broker. State here is intentionally non-timeline and non-replayed: it is
rebuilt from
+ * scratch on broker restart, and the persisted {@code
StoredDescriptionTopologyEpoch} /
+ * {@code FailedDescriptionTopologyEpoch} fields on each streams group drive
+ * convergence after a restart.
+ *
+ * <p>Methods that schedule runtime operations require a partition resolver
supplied by
+ * the caller (typically {@code GroupCoordinatorService::topicPartitionFor})
so this
+ * class can stay decoupled from the offsets-topic partition layout.
+ */
+public class TopologyDescriptionManager {
+ private final Logger log;
+ private final Optional<StreamsGroupTopologyDescriptionPlugin> plugin;
+ private final StreamsGroupTopologyDescriptionBackoff backoff;
+ private final CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord>
runtime;
+ private final Function<String, TopicPartition> topicPartitionFor;
+
+ public TopologyDescriptionManager(
+ LogContext logContext,
+ Optional<StreamsGroupTopologyDescriptionPlugin> plugin,
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime,
+ Function<String, TopicPartition> topicPartitionFor,
+ Time time
+ ) {
+ this.log = logContext.logger(TopologyDescriptionManager.class);
+ this.plugin = plugin;
+ this.runtime = runtime;
+ this.topicPartitionFor = topicPartitionFor;
+ this.backoff = new StreamsGroupTopologyDescriptionBackoff(time);
+ }
+
+ /**
+ * @return true if a topology description plugin is configured on this
broker.
+ */
+ public boolean isPluginConfigured() {
+ return plugin.isPresent();
+ }
+
+ /**
+ * Post-processes a successful streams group heartbeat result by deciding
whether the
+ * broker should set {@code TopologyDescriptionRequired=true} on the
response, and
+ * arming the per-group back-off when it does.
+ *
+ * <p>The flag is set when the topology description plugin is configured,
the group
+ * has resolved to a topology epoch, that epoch is neither stored nor
permanently
+ * failed at the plugin, no back-off is in effect for this epoch, and the
response
+ * does not carry a {@code STALE_TOPOLOGY} status (the member would just
be told to
+ * catch up first). When the response already carries an error code we
leave it
+ * alone.
+ */
+ public StreamsGroupHeartbeatResult maybeSetTopologyDescriptionRequired(
+ StreamsGroupHeartbeatResult result,
+ String groupId
+ ) {
+ if (plugin.isEmpty()) {
+ return result;
+ }
+ StreamsGroupHeartbeatResponseData response = result.data();
+ if (response.errorCode() != Errors.NONE.code()) {
+ return result;
+ }
+ int currentEpoch = result.currentTopologyEpoch();
+ if (currentEpoch < 0
+ || result.storedDescriptionTopologyEpoch() == currentEpoch
+ || result.failedDescriptionTopologyEpoch() == currentEpoch
+ || responseHasStaleTopology(response)) {
+ return result;
+ }
+ // Atomic check-and-arm: only set the flag if the back-off window is
not already
+ // in effect for this epoch, so two concurrent heartbeats for the same
group cannot
+ // both arm the back-off and double the window beyond its intended
length.
+ if (backoff.armIfNotActive(groupId, currentEpoch)) {
+ response.setTopologyDescriptionRequired(true);
+ }
+ return result;
+ }
+
+ /**
+ * Reject the request synchronously when no plugin is configured or the
request fails
+ * basic structural validation. Returns the response to send back to the
client, or
+ * empty when the request is accepted for further processing. The caller
is expected
+ * to have already short-circuited on a non-active coordinator.
+ */
+ public Optional<StreamsGroupTopologyDescriptionUpdateResponseData>
preCheckTopologyDescriptionUpdate(
+ StreamsGroupTopologyDescriptionUpdateRequestData request
+ ) {
+ if (plugin.isEmpty()) {
+ return Optional.of(errorResponse(
+ Errors.UNSUPPORTED_VERSION,
+ "The broker has no streams group topology description plugin
configured."
+ ));
+ }
+ if (request.memberId() == null || request.memberId().isEmpty()) {
+ return Optional.of(errorResponse(Errors.INVALID_REQUEST, "MemberId
can't be empty."));
+ }
+ if (request.groupId() == null || request.groupId().isEmpty()) {
+ return Optional.of(errorResponse(Errors.INVALID_REQUEST, "GroupId
can't be empty."));
+ }
+ if (request.topologyDescription() == null) {
+ return Optional.of(errorResponse(Errors.INVALID_REQUEST,
"TopologyDescription can't be null."));
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Drive the push chain: validate the (group, member), convert the wire
payload, call
+ * the plugin, persist the outcome, and centralize back-off state
mutations in a
+ * single {@code whenComplete}.
+ *
+ * <p>The chain carries the terminal disposition through an {@link
AtomicReference}
+ * holder so {@code whenComplete} can act on it without having to reason
about the
+ * response shape. Default is {@link BackoffAction#ARM}; any post-plugin
failure
+ * (including a metadata-record write that fails after a successful plugin
call)
+ * therefore re-arms the back-off and the next heartbeat re-solicits an
idempotent
+ * re-push.
+ */
+ public
CompletableFuture<StreamsGroupTopologyDescriptionUpdateResponseData>
handleSetTopology(
+ StreamsGroupTopologyDescriptionUpdateRequestData request
+ ) {
+ final String groupId = request.groupId();
+ final String memberId = request.memberId();
+ final int pushedEpoch = request.topologyEpoch();
+ final TopicPartition tp = topicPartitionFor.apply(groupId);
+ final StreamsGroupTopologyDescriptionPlugin p = plugin.get();
+
+ final AtomicReference<BackoffAction> backoffAction = new
AtomicReference<>(BackoffAction.ARM);
+ return runtime.scheduleReadOperation(
+ "streams-group-topology-description-validate",
+ tp,
+ (coordinator, lastCommittedOffset) -> {
+ coordinator.validateStreamsGroupMember(groupId, memberId,
lastCommittedOffset);
+ return null;
+ })
+ .thenApply(__ ->
StreamsGroupTopologyDescriptionConverter.fromRequest(request.topologyDescription()))
+ .thenCompose(description -> invokePluginSetTopology(p, groupId,
pushedEpoch, description))
+ .thenCompose(pluginOutcome -> switch (pluginOutcome.kind()) {
+ case SUCCESS -> runtime.scheduleWriteOperation(
+ "streams-group-set-stored-topology-epoch",
+ tp,
+ coordinator ->
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, false)
+ ).thenApply(unused -> {
+ backoffAction.set(BackoffAction.CLEAR);
+ return new
StreamsGroupTopologyDescriptionUpdateResponseData();
+ });
+ case PERMANENT -> runtime.scheduleWriteOperation(
+ "streams-group-set-failed-topology-epoch",
+ tp,
+ coordinator ->
coordinator.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, true)
+ ).thenApply(unused -> {
+ backoffAction.set(BackoffAction.CLEAR);
+ return errorResponse(
+ Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED,
pluginOutcome.message());
+ });
+ case TRANSIENT ->
CompletableFuture.completedFuture(errorResponse(
+ Errors.STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED,
pluginOutcome.message()));
+ })
+ .whenComplete((response, throwable) -> {
+ if (backoffAction.get() == BackoffAction.CLEAR) {
+ backoff.clear(groupId);
+ } else {
+ backoff.armOrExtend(groupId, pushedEpoch);
+ }
+ });
+ }
+
+ /**
+ * Call {@code plugin.deleteTopology} for every supplied group id that is
a streams
+ * group carrying a stored topology description. Returns a per-group map
of failures
+ * keyed by group id; groups absent from the map either had no plugin
state or the
+ * plugin call succeeded. Empty (no-op) when no plugin is configured. Used
by the
+ * service-level {@code DeleteGroups} flow to gate tombstoning on plugin
success.
+ */
+ public CompletableFuture<Map<String, ApiError>> deleteBeforeGroupDelete(
+ TopicPartition topicPartition,
+ List<String> groupIds
+ ) {
+ if (plugin.isEmpty()) {
+ return CompletableFuture.completedFuture(Map.of());
+ }
+ final StreamsGroupTopologyDescriptionPlugin p = plugin.get();
+ return runtime.scheduleReadOperation(
+ "streams-group-topology-pre-delete",
+ topicPartition,
+ (coordinator, lastCommittedOffset) ->
+
coordinator.streamsGroupsWithStoredTopologyDescription(groupIds,
lastCommittedOffset))
+ .thenCompose(groupsWithStored -> {
+ if (groupsWithStored.isEmpty()) {
+ return CompletableFuture.completedFuture(Map.<String,
ApiError>of());
+ }
+ Map<String, CompletableFuture<Throwable>> calls = new
HashMap<>(groupsWithStored.size());
+ for (String groupId : groupsWithStored) {
+ CompletableFuture<Throwable> outcome;
+ try {
+ outcome = p.deleteTopology(groupId).handle((unused,
throwable) -> throwable);
+ } catch (Throwable t) {
+ outcome = CompletableFuture.completedFuture(t);
+ }
+ calls.put(groupId, outcome);
+ }
+ CompletableFuture<?>[] all = calls.values().toArray(new
CompletableFuture<?>[0]);
+ return CompletableFuture.allOf(all).thenApply(unused -> {
+ Map<String, ApiError> failures = new HashMap<>();
+ calls.forEach((groupId, throwableFuture) -> {
+ Throwable t = throwableFuture.join();
+ if (t != null) {
+ Throwable cause = t instanceof CompletionException
&& t.getCause() != null
+ ? t.getCause() : t;
+ log.warn("Topology description plugin failed to
delete topology for streams group {}.",
+ groupId, cause);
+ backoff.clear(groupId);
+ failures.put(groupId, new
ApiError(Errors.GROUP_DELETION_FAILED, cause.getMessage()));
+ } else {
+ backoff.clear(groupId);
+ }
+ });
+ return failures;
+ });
+ })
+ .exceptionally(exception -> {
+ // A failure to read the streams group state (NOT_COORDINATOR
etc.) is treated
+ // the same as a plugin failure: every retained group is held
back from
+ // tombstoning so the caller can retry. The downstream chain
prefers the more
+ // specific error from the runtime over GROUP_DELETION_FAILED.
+ Errors error = Errors.forException(exception);
+ Map<String, ApiError> failures = new HashMap<>();
+ for (String groupId : groupIds) {
+ failures.put(groupId, new ApiError(error,
exception.getMessage()));
+ }
+ return failures;
+ });
+ }
+
+ // Visible for testing.
+ StreamsGroupTopologyDescriptionBackoff backoff() {
+ return backoff;
+ }
+
+ private static boolean
responseHasStaleTopology(StreamsGroupHeartbeatResponseData response) {
+ if (response.status() == null) {
+ return false;
+ }
+ byte staleCode = Status.STALE_TOPOLOGY.code();
+ return response.status().stream().anyMatch(s -> s.statusCode() ==
staleCode);
+ }
+
+ private static StreamsGroupTopologyDescriptionUpdateResponseData
errorResponse(
+ Errors error,
+ String message
+ ) {
+ return new StreamsGroupTopologyDescriptionUpdateResponseData()
+ .setErrorCode(error.code())
+ .setErrorMessage(message);
+ }
+
+ /**
+ * Calls the plugin's {@code setTopology} and folds the result into a
{@link PluginOutcome}.
+ * The future never completes exceptionally — the outcome carries the
failure category.
+ */
+ private static CompletableFuture<PluginOutcome> invokePluginSetTopology(
+ StreamsGroupTopologyDescriptionPlugin plugin,
+ String groupId,
+ int pushedEpoch,
+ StreamsGroupTopologyDescription description
+ ) {
+ final CompletableFuture<Void> pluginFuture;
+ try {
+ pluginFuture = plugin.setTopology(groupId, pushedEpoch,
description);
+ } catch (Throwable t) {
+ // A synchronous throw from the plugin is treated as a permanent
failure with a
+ // generic client-visible message.
+ return
CompletableFuture.completedFuture(PluginOutcome.permanent(t.getMessage()));
+ }
Review Comment:
Agree — and this is already flagged on #22552 where invokePluginSetTopology
lives. The KIP wants a generic message and t.getMessage() can be null, so we
should use a fixed string there rather than forwarding it. Will address on
#22552.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]