Copilot commented on code in PR #22413:
URL: https://github.com/apache/kafka/pull/22413#discussion_r3453587430


##########
core/src/test/scala/unit/kafka/server/StreamsGroupTopologyDescriptionUpdateRequestTest.scala:
##########
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.utils.TestUtils
+import org.apache.kafka.common.message.{StreamsGroupHeartbeatRequestData, 
StreamsGroupTopologyDescriptionUpdateRequestData}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import 
org.apache.kafka.common.requests.{StreamsGroupTopologyDescriptionUpdateRequest, 
StreamsGroupTopologyDescriptionUpdateResponse}
+import org.apache.kafka.common.test.ClusterInstance
+import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+import org.apache.kafka.server.streams.InMemoryTopologyDescriptionPlugin
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertNotEquals, assertTrue}
+
+import java.util
+import scala.jdk.CollectionConverters._
+
+@ClusterTestDefaults(
+  types = Array(Type.KRAFT),
+  serverProperties = Array(
+    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_PLUGIN_CLASS_CONFIG,
+      value = 
"org.apache.kafka.server.streams.InMemoryTopologyDescriptionPlugin"),
+    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = 
"0")
+  )
+)
+class StreamsGroupTopologyDescriptionUpdateRequestTest(cluster: 
ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
+
+  private def createStreamsGroup(groupId: String, topicName: String): Unit = {
+    val topology = new StreamsGroupHeartbeatRequestData.Topology()
+      .setEpoch(1)
+      .setSubtopologies(List(
+        new StreamsGroupHeartbeatRequestData.Subtopology()
+          .setSubtopologyId("subtopology-0")
+          .setSourceTopics(List(topicName).asJava)
+          .setRepartitionSinkTopics(List.empty.asJava)
+          .setRepartitionSourceTopics(List.empty.asJava)
+      ).asJava)
+
+    streamsGroupHeartbeat(
+      groupId = groupId,
+      memberId = "setup-member",
+      rebalanceTimeoutMs = 1000,
+      activeTasks = List.empty,
+      standbyTasks = List.empty,
+      warmupTasks = List.empty,
+      topology = topology,
+      expectedError = Errors.NONE
+    )
+  }
+
+  private def buildTopologyDescription(
+    subtopologies: 
List[StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionSubtopology]
 = List.empty,
+    globalStores: 
List[StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionGlobalStore]
 = List.empty
+  ): StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescription = {
+    new StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescription()
+      .setSubtopologies(subtopologies.asJava)
+      .setGlobalStores(globalStores.asJava)
+  }
+
+  private def sendUpdateTopologyDescription(
+    groupId: String,
+    topologyEpoch: Int,
+    topoDesc: 
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescription
+  ): StreamsGroupTopologyDescriptionUpdateResponse = {
+    val requestData = new StreamsGroupTopologyDescriptionUpdateRequestData()
+      .setGroupId(groupId)
+      .setTopologyEpoch(1).setTopologyDescription(new 
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescription())
+      .setTopologyDescription(topoDesc)
+

Review Comment:
   `sendUpdateTopologyDescription` ignores the `topologyEpoch` parameter (it 
always sets epoch to 1) and it also sets `topologyDescription` twice, with the 
second call potentially setting it to `null` (which the broker-side validation 
rejects). This makes the helper unreliable and breaks the delete test path.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4567,13 +4604,7 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
 
         return new CoordinatorResult<>(
             List.of(record),
-            new StreamsGroupHeartbeatResult(
-                response,
-                Map.of(),
-                group.currentTopologyEpoch(),
-                group.storedDescriptionTopologyEpoch(),
-                group.failedDescriptionTopologyEpoch()
-            )
+            new StreamsGroupHeartbeatResult(response, Map.of(), 
group.currentTopologyEpoch(), -1, -1)
         );

Review Comment:
   `streamsGroupStaticMemberGroupLeave` also hard-codes 
`storedDescriptionTopologyEpoch` / `failedDescriptionTopologyEpoch` to `-1` in 
the returned `StreamsGroupHeartbeatResult`. This loses the broker-side plugin 
tracking state in the heartbeat response; it should preserve the group's 
current values just like other heartbeat/leave paths.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4492,13 +4535,7 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
             } else {
                 log.info("[GroupId {}][MemberId {}] Static member {} with 
instance id {} left the streams group.",
                     group.groupId(), memberId, memberId, instanceId);
-                return streamsGroupFenceMember(group, member, new 
StreamsGroupHeartbeatResult(
-                    response,
-                    Map.of(),
-                    group.currentTopologyEpoch(),
-                    group.storedDescriptionTopologyEpoch(),
-                    group.failedDescriptionTopologyEpoch()
-                ));
+                return streamsGroupFenceMember(group, member, new 
StreamsGroupHeartbeatResult(response, Map.of(), group.currentTopologyEpoch(), 
-1, -1));

Review Comment:
   In the static-member leave path, the `StreamsGroupHeartbeatResult` is built 
with `storedDescriptionTopologyEpoch` / `failedDescriptionTopologyEpoch` 
hard-coded to `-1`. This diverges from the non-static path above and will 
incorrectly clear these fields in the heartbeat response, potentially 
preventing expected re-solicitation / gating behavior on the client side.



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java:
##########
@@ -225,6 +246,78 @@ private StreamsGroupMemberDescription.Endpoint 
convertEndpoint(final StreamsGrou
         return new StreamsGroupMemberDescription.Endpoint(endpoint.host(), 
endpoint.port());
     }
 
+    private static final byte NODE_TYPE_SOURCE = 1;
+    private static final byte NODE_TYPE_PROCESSOR = 2;
+    private static final byte NODE_TYPE_SINK = 3;
+
+    private static StreamsGroupTopologyDescription convertTopologyDescription(
+            final StreamsGroupDescribeResponseData.TopologyDescription wire) {
+        final List<StreamsGroupTopologyDescription.Subtopology> subtopologies 
= new ArrayList<>(wire.subtopologies().size());
+        for (StreamsGroupDescribeResponseData.TopologyDescriptionSubtopology 
sub : wire.subtopologies()) {
+            subtopologies.add(new StreamsGroupTopologyDescription.Subtopology(
+                sub.subtopologyId(),
+                convertNodes(sub.nodes())
+            ));
+        }
+        final List<StreamsGroupTopologyDescription.GlobalStore> globalStores = 
new ArrayList<>(wire.globalStores().size());
+        for (StreamsGroupDescribeResponseData.TopologyDescriptionGlobalStore 
gs : wire.globalStores()) {
+            final StreamsGroupTopologyDescription.Node sourceNode = 
convertNode(gs.source(), Set.of(), Set.of());
+            final StreamsGroupTopologyDescription.Node processorNode = 
convertNode(gs.processor(), Set.of(), Set.of());
+            globalStores.add(new StreamsGroupTopologyDescription.GlobalStore(
+                (StreamsGroupTopologyDescription.Source) sourceNode,
+                (StreamsGroupTopologyDescription.Processor) processorNode
+            ));
+        }

Review Comment:
   Global store nodes are converted with empty predecessor/successor sets, 
which discards the wire-provided successor edges (and prevents reconstructing 
predecessors). This makes the resulting `StreamsGroupTopologyDescription` 
inconsistent with subtopology conversion and breaks formatting that relies on 
neighbours.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -786,6 +805,163 @@ private void 
throwIfStreamsGroupTopologyDescriptionUpdateInvalid(
         throwIfNull(request.topologyDescription(), "TopologyDescription can't 
be null.");
     }
 
+    /**
+     * Schedule the next topology-description cleanup tick on the broker-level 
{@link Timer}.
+     * The {@link TimerTask} self-reschedules from inside its own {@code run} 
so the cycle keeps
+     * firing every {@code offsets.retention.check.interval.ms} until {@code 
shutdown} flips
+     * {@code isActive} to false. No-op when no plugin is configured.
+     *
+     * <p>This lifecycle lives on the service rather than on
+     * {@link StreamsGroupTopologyDescriptionManager} because the cycle drives 
a runtime chain
+     * (per-shard read → per-group plugin call → conditional metadata write); 
per the existing
+     * separation, the manager exposes plugin-invocation and back-off building 
blocks and the
+     * service assembles the chain.
+     */
+    private void scheduleStreamsGroupTopologyCleanupCycle() {
+        if (!streamsGroupTopologyDescriptionManager.isPluginConfigured()) 
return;
+        if (!isActive.get()) return;
+        TimerTask task = new 
TimerTask(config.offsetsRetentionCheckIntervalMs()) {
+            @Override
+            public void run() {
+                if (!isActive.get()) return;
+                try {
+                    runStreamsGroupTopologyCleanupCycle();
+                } catch (Throwable t) {
+                    log.warn("Unexpected error running topology-description 
cleanup cycle.", t);
+                }
+                if (isActive.get()) scheduleStreamsGroupTopologyCleanupCycle();
+            }
+        };
+        streamsTopologyCleanupTask = task;
+        timer.add(task);
+    }
+
+    /**
+     * Drive one topology-description cleanup cycle: read every shard for 
streams groups
+     * eligible for plugin-side cleanup (empty + all offsets expired + 
storedEpoch != -1), call
+     * {@code plugin.deleteTopology} for each via the manager, then for every 
group whose
+     * plugin call succeeded write a conditional metadata record that clears
+     * {@code StoredDescriptionTopologyEpoch} only if the persisted value 
still matches the
+     * epoch we observed at scan time (so a concurrent {@code setTopology} 
that has advanced
+     * the field is preserved). Failed plugin calls retry on the next cycle; 
the next sweep
+     * then tombstones the now-empty group.
+     *
+     * <p>Single-flight: a cycle that fires while a previous one is still 
settling per-group
+     * futures is dropped with a warn-level log.
+     *
+     * <p><b>Concurrent setTopology race vs plugin.deleteTopology.</b> {@code 
plugin.deleteTopology}
+     * is keyed only on {@code groupId}. If a new member joins between the
+     * eligibility scan and the cycle's plugin call and pushes a fresh 
topology, the plugin's
+     * row is removed regardless of the new epoch — the conditional clear 
above no-ops on the
+     * metadata side, but the plugin-side data the member just wrote is gone. 
A subsequent
+     * {@code describe} → {@code getTopology} returns null and surfaces {@code 
NOT_STORED} with
+     * a warn log; this is the graceful-degradation path KIP-1331 accepts 
under the label
+     * "plugin-side data loss". The {@code isEmpty} requirement on the scan 
keeps the window
+     * narrow — concurrent setTopology requires a member to join an empty, 
fully-expired group
+     * between scan and delete — and the next heartbeat at the same epoch will 
not re-solicit
+     * (storedEpoch in metadata still reflects the new push), so the group 
converges on
+     * NOT_STORED without churn rather than chasing the lost plugin row.
+     */
+    // Visible for testing.
+    void runStreamsGroupTopologyCleanupCycle() {
+        if (!streamsGroupTopologyDescriptionManager.isPluginConfigured()) 
return;
+        if (!streamsTopologyCleanupCycleInFlight.compareAndSet(false, true)) {
+            log.warn("Topology-description cleanup cycle skipped: previous 
cycle is still in flight.");
+            return;
+        }
+        groupCoordinatorMetrics.recordSensor(
+            
GroupCoordinatorMetrics.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_CLEANUP_CYCLE_RUNS_SENSOR_NAME);
+
+        List<CompletableFuture<Map<String, Integer>>> partitionFutures = 
runtime.scheduleReadAllOperation(
+            "list-streams-groups-needing-topology-cleanup",
+            GroupCoordinatorShard::listStreamsGroupsNeedingTopologyCleanup
+        );
+
+        // ConcurrentLinkedQueue because per-partition .handle callbacks can 
append concurrently
+        // from whichever thread completed each runtime read.
+        Queue<CompletableFuture<?>> perGroupFutures = new 
ConcurrentLinkedQueue<>();
+        List<CompletableFuture<Void>> partitionDoneFutures = new 
ArrayList<>(partitionFutures.size());
+        for (CompletableFuture<Map<String, Integer>> partitionFuture : 
partitionFutures) {
+            partitionDoneFutures.add(partitionFuture.handle((eligible, 
throwable) -> {
+                if (throwable != null) {
+                    log.warn("Topology-description cleanup read failed for one 
partition.", throwable);
+                    return null;
+                }
+                if (eligible == null || eligible.isEmpty()) return null;
+                groupCoordinatorMetrics.recordSensor(
+                    
GroupCoordinatorMetrics.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_CLEANUP_ELIGIBLE_GROUPS_SENSOR_NAME,
+                    eligible.size()
+                );
+                perGroupFutures.add(streamsGroupTopologyDescriptionManager
+                    .invokeDeleteTopologies(eligible.keySet())
+                    .thenAccept(failures -> {
+                        recordPluginDeleteOutcome(eligible.size(), 
failures.size());
+                        eligible.forEach((groupId, expectedStoredEpoch) -> {
+                            // Eligibility required the group to be empty: no 
member is heartbeating
+                            // anymore, so any push back-off entry is moot 
regardless of plugin
+                            // outcome — dropping it now avoids carrying 
broker-wide state forward
+                            // for an id that no live member will solicit on.
+                            
streamsGroupTopologyDescriptionManager.clearBackoffGroup(groupId);
+                            if (failures.containsKey(groupId)) {
+                                // Plugin failed: leave stored epoch in place; 
next cycle retries.
+                                return;
+                            }
+                            clearStoredDescriptionTopologyEpochAsync(groupId, 
expectedStoredEpoch);
+                        });
+                    }));
+                return null;
+            }));
+        }
+
+        CompletableFuture.allOf(partitionDoneFutures.toArray(new 
CompletableFuture<?>[0]))
+            .thenCompose(__ -> 
CompletableFuture.allOf(perGroupFutures.toArray(new CompletableFuture<?>[0])))
+            .whenComplete((__, throwable) -> {
+                if (throwable != null) {
+                    log.warn("Topology-description cleanup cycle failed to 
complete cleanly.", throwable);
+                }
+                streamsTopologyCleanupCycleInFlight.set(false);
+            });

Review Comment:
   `streamsTopologyCleanupCycleInFlight` is released after waiting for 
`partitionDoneFutures` and `perGroupFutures`, but the asynchronous 
`clearStoredDescriptionTopologyEpochAsync(...)` writes are fire-and-forget and 
are not included in `perGroupFutures`. This means a subsequent cycle can start 
while conditional-clear writes from the previous cycle are still in flight (and 
the method-level comment about "settling ... conditional-clear writes" is not 
currently true).



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupHeartbeatResultTest.java:
##########
@@ -74,5 +62,6 @@ public void testCreatableTopicsMapIsImmutable() {
     public void testNullDataIsRejected() {
         assertThrows(NullPointerException.class,
             () -> new StreamsGroupHeartbeatResult(null, Map.of(), -1, -1, -1));
+        assertTrue(true);

Review Comment:
   The `assertTrue(true)` at the end of `testNullDataIsRejected` is a no-op and 
makes the test look like it’s asserting something additional when it isn’t.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -1034,6 +1035,60 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
             .setTopics(topicResponses);
     }
 
+    /**
+     * Whether the offset committed for {@code (groupId, topic, partition)} is 
currently
+     * eligible for expiration: its age exceeds {@code offsets.retention.ms} 
per the group's
+     * {@link OffsetExpirationCondition}, and there is no pending 
transactional offset on the
+     * partition. Shared between {@link #cleanupExpiredOffsets} (the write 
path that tombstones
+     * eligible offsets) and {@link #allOffsetsExpired} (the read-only check 
used by the
+     * topology-description cleanup cycle) so the two cannot drift.
+     */
+    private boolean isOffsetExpirable(
+        String groupId,
+        String topic,
+        int partition,
+        OffsetAndMetadata offsetAndMetadata,
+        OffsetExpirationCondition condition,
+        long currentTimestampMs
+    ) {
+        return condition.isOffsetExpired(offsetAndMetadata, 
currentTimestampMs, config.offsetsRetentionMs())
+            && !hasPendingTransactionalOffsets(groupId, topic, partition);
+    }
+
+    /**
+     * Read-only counterpart to {@link #cleanupExpiredOffsets(String, List)}: 
returns whether
+     * every committed offset for the group is currently eligible for 
expiration and no pending
+     * transactional offsets remain. Used by the topology-description plugin 
cleanup cycle on
+     * the eligibility read side, where the sweep must not mutate any record 
but still needs to
+     * decide whether the group is fully expirable before driving a {@code 
plugin.deleteTopology}.
+     */
+    public boolean allOffsetsExpired(String groupId, long currentTimestampMs) {
+        TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> 
offsetsByTopic =
+            offsets.offsetsByGroup.get(groupId);
+        if (offsetsByTopic == null) {
+            return !openTransactions.contains(groupId);
+        }
+        Group group = groupMetadataManager.group(groupId);
+        Optional<OffsetExpirationCondition> offsetExpirationCondition = 
group.offsetExpirationCondition();
+        if (offsetExpirationCondition.isEmpty()) {

Review Comment:
   `allOffsetsExpired` reads both offsets and group metadata at 
`Long.MAX_VALUE`/"latest" (via `offsetsByGroup.get(groupId)` and 
`groupMetadataManager.group(groupId)`), even though its main caller 
(`listStreamsGroupsNeedingTopologyCleanup`) is operating at a specific 
`committedOffset`. Mixing snapshot reads like this can make the eligibility 
decision inconsistent under concurrent updates (and `group(groupId)` can throw, 
forcing the scan to skip the group). Consider passing `committedOffset` into 
`allOffsetsExpired` and using timeline reads (`offsetsByGroup.get(groupId, 
committedOffset)` + `groupMetadataManager.maybeGroup(groupId, 
committedOffset)`) to keep the scan consistent.



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java:
##########
@@ -411,6 +412,13 @@ private void addDefaultBrokerPropsIfAbsent(final 
Properties brokerConfig) {
         
brokerConfig.putIfAbsent(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
 "1");
         
brokerConfig.putIfAbsent(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, 
true);
         brokerConfig.putIfAbsent(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, 
true);
+        final String pluginKey = 
GroupCoordinatorConfig.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_PLUGIN_CLASS_CONFIG;
+        if ("".equals(brokerConfig.get(pluginKey))) {
+            // Empty string is a sentinel meaning "no plugin": remove so the 
broker sees null default.
+            brokerConfig.remove(pluginKey);
+        } else {
+            brokerConfig.putIfAbsent(pluginKey, 
InMemoryTopologyDescriptionPlugin.class.getName());
+        }

Review Comment:
   `EmbeddedKafkaCluster` now enables `InMemoryTopologyDescriptionPlugin` by 
default for *all* Streams integration tests (unless callers set the config key 
to an empty-string sentinel). This changes baseline broker behavior across a 
large number of tests (including scheduling the periodic topology-cleanup 
task), which risks introducing unrelated timing/flakiness changes. It’s safer 
to keep the default as "no plugin" and have only the topology-description tests 
opt in via `BROKER_CONFIG`.



##########
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##########
@@ -262,33 +265,61 @@ private void printGroupInfo(List<GroupListing> groups) {
             }
         }
 
-        public void describeGroups() throws ExecutionException, 
InterruptedException {
+        public boolean describeGroups() throws ExecutionException, 
InterruptedException {
             List<String> groupIds = opts.options.has(opts.allGroupsOpt)
                 ? new ArrayList<>(listStreamsGroups())
                 : new ArrayList<>(opts.options.valuesOf(opts.groupOpt));
+            boolean ok = true;
             if (!groupIds.isEmpty()) {
                 for (String groupId : groupIds) {
-                    StreamsGroupDescription description = 
getDescribeGroup(groupId);
+                    boolean wantTopology = opts.options.has(opts.topologyOpt);
+                    StreamsGroupDescription description = 
getDescribeGroup(groupId, wantTopology);
                     boolean verbose = opts.options.has(opts.verboseOpt);
                     if (opts.options.has(opts.membersOpt)) {
                         printMembers(description, verbose);
                     } else if (opts.options.has(opts.stateOpt)) {
                         printStates(description, verbose);
+                    } else if (wantTopology) {
+                        ok &= printTopology(description);
                     } else {
                         printOffsets(description, verbose);
                     }
                 }
             }
+            return ok;
         }
 
         StreamsGroupDescription getDescribeGroup(String group) throws 
ExecutionException, InterruptedException {
+            return getDescribeGroup(group, false);
+        }
+
+        StreamsGroupDescription getDescribeGroup(String group, boolean 
includeTopologyDescription) throws ExecutionException, InterruptedException {
             DescribeStreamsGroupsResult result = 
adminClient.describeStreamsGroups(
                 List.of(group),
-                withTimeoutMs(new DescribeStreamsGroupsOptions()));
+                withTimeoutMs(new DescribeStreamsGroupsOptions()
+                    .includeTopologyDescription(includeTopologyDescription)));
             Map<String, StreamsGroupDescription> descriptionMap = 
result.all().get();
             return descriptionMap.get(group);
         }
 
+        private boolean printTopology(StreamsGroupDescription description) {
+            switch (description.topologyDescriptionStatus()) {
+                case AVAILABLE:
+                    
System.out.println(TopologyDescriptionFormatter.format(description.topologyDescription().get()));
+                    return true;

Review Comment:
   `printTopology` assumes `topologyDescription().get()` is always present when 
status is `AVAILABLE`. If the broker ever returns an inconsistent response 
(status AVAILABLE but description null), this will throw 
`NoSuchElementException` and make the CLI crash instead of printing a helpful 
message and returning a non-zero status.



##########
tools/src/main/java/org/apache/kafka/tools/streams/TopologyDescriptionFormatter.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.streams;
+
+import org.apache.kafka.clients.admin.StreamsGroupTopologyDescription;
+
+import java.util.Collection;
+
+/**
+ * Formats a {@link StreamsGroupTopologyDescription} using the same layout as
+ * {@code Topology#describe().toString()} in the Kafka Streams API. This keeps 
the
+ * {@code kafka-streams-groups.sh --describe --topology} output familiar to 
users who
+ * have seen the client-side topology description before.
+ */

Review Comment:
   The Javadoc claims this uses the *same* layout as 
`Topology#describe().toString()`, but the formatter prints global stores under 
a separate `Global Stores:` section. The Streams 
`TopologyDescription#toString()` output interleaves global stores as 
`Sub-topology: <id> for global store ...`, so this is not the same layout.



##########
core/src/test/scala/unit/kafka/server/StreamsGroupTopologyDescriptionUpdateRequestTest.scala:
##########
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.utils.TestUtils
+import org.apache.kafka.common.message.{StreamsGroupHeartbeatRequestData, 
StreamsGroupTopologyDescriptionUpdateRequestData}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import 
org.apache.kafka.common.requests.{StreamsGroupTopologyDescriptionUpdateRequest, 
StreamsGroupTopologyDescriptionUpdateResponse}
+import org.apache.kafka.common.test.ClusterInstance
+import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+import org.apache.kafka.server.streams.InMemoryTopologyDescriptionPlugin
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertNotEquals, assertTrue}
+
+import java.util
+import scala.jdk.CollectionConverters._
+
+@ClusterTestDefaults(
+  types = Array(Type.KRAFT),
+  serverProperties = Array(
+    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_PLUGIN_CLASS_CONFIG,
+      value = 
"org.apache.kafka.server.streams.InMemoryTopologyDescriptionPlugin"),
+    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = 
"0")
+  )
+)
+class StreamsGroupTopologyDescriptionUpdateRequestTest(cluster: 
ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
+
+  private def createStreamsGroup(groupId: String, topicName: String): Unit = {
+    val topology = new StreamsGroupHeartbeatRequestData.Topology()
+      .setEpoch(1)
+      .setSubtopologies(List(
+        new StreamsGroupHeartbeatRequestData.Subtopology()
+          .setSubtopologyId("subtopology-0")
+          .setSourceTopics(List(topicName).asJava)
+          .setRepartitionSinkTopics(List.empty.asJava)
+          .setRepartitionSourceTopics(List.empty.asJava)
+      ).asJava)
+
+    streamsGroupHeartbeat(
+      groupId = groupId,
+      memberId = "setup-member",
+      rebalanceTimeoutMs = 1000,
+      activeTasks = List.empty,
+      standbyTasks = List.empty,
+      warmupTasks = List.empty,
+      topology = topology,
+      expectedError = Errors.NONE
+    )
+  }
+
+  private def buildTopologyDescription(
+    subtopologies: 
List[StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionSubtopology]
 = List.empty,
+    globalStores: 
List[StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionGlobalStore]
 = List.empty
+  ): StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescription = {
+    new StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescription()
+      .setSubtopologies(subtopologies.asJava)
+      .setGlobalStores(globalStores.asJava)
+  }
+
+  private def sendUpdateTopologyDescription(
+    groupId: String,
+    topologyEpoch: Int,
+    topoDesc: 
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescription
+  ): StreamsGroupTopologyDescriptionUpdateResponse = {
+    val requestData = new StreamsGroupTopologyDescriptionUpdateRequestData()
+      .setGroupId(groupId)
+      .setTopologyEpoch(1).setTopologyDescription(new 
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescription())
+      .setTopologyDescription(topoDesc)
+
+    val request = new 
StreamsGroupTopologyDescriptionUpdateRequest.Builder(requestData)
+      
.build(ApiKeys.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE.latestVersion(isUnstableApiEnabled))
+
+    connectAndReceive[StreamsGroupTopologyDescriptionUpdateResponse](request)
+  }
+
+  private def simpleSubtopology(): 
List[StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionSubtopology]
 = {
+    val node = new 
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode()
+      .setName("source-node")
+      .setNodeType(1.toByte)
+      .setSourceTopics(util.Arrays.asList("input-topic"))
+      .setStores(util.Collections.emptyList())
+      .setSuccessors(util.Arrays.asList("processor-node"))
+
+    val processorNode = new 
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode()
+      .setName("processor-node")
+      .setNodeType(2.toByte)
+      .setSourceTopics(util.Collections.emptyList())
+      .setStores(util.Collections.emptyList())
+      .setSuccessors(util.Collections.emptyList())
+
+    List(new 
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionSubtopology()
+      .setSubtopologyId("subtopology-0")
+      .setNodes(util.Arrays.asList(node, processorNode)))
+  }
+
+  @ClusterTest
+  def testUpdateTopologyDescriptionWithPlugin(): Unit = {
+    val groupId = "test-streams-group"
+    val topicName = "input-topic"
+
+    createOffsetsTopic()
+    createTopic(topicName, 1)
+    createStreamsGroup(groupId, topicName)
+
+    val response = sendUpdateTopologyDescription(groupId, 1, 
buildTopologyDescription(simpleSubtopology()))
+    assertEquals(Errors.NONE.code, response.data.errorCode,
+      s"Expected NONE but got ${Errors.forCode(response.data.errorCode)}: 
${response.data.errorMessage}")
+
+    // Verify the plugin actually stored the topology
+    val plugin = InMemoryTopologyDescriptionPlugin.instances().asScala
+      .find(_.storedTopology(groupId).isPresent)
+    assertTrue(plugin.isDefined, "Expected at least one plugin instance to 
have the topology")
+    assertNotEquals(-1, plugin.get.storedDescriptionTopologyEpoch(groupId))
+    val storedTopo = plugin.get.storedTopology(groupId).get()
+    assertEquals(1, storedTopo.subtopologies.size)
+    assertEquals("subtopology-0", 
storedTopo.subtopologies.iterator().next().id())
+  }
+
+  @ClusterTest
+  def testUpdateTopologyDescriptionDeleteTopology(): Unit = {
+    val groupId = "test-streams-group-delete"
+    val topicName = "input-topic"
+
+    createOffsetsTopic()
+    createTopic(topicName, 1)
+    createStreamsGroup(groupId, topicName)
+
+    // First, set a topology
+    val setResponse = sendUpdateTopologyDescription(groupId, 1, 
buildTopologyDescription(simpleSubtopology()))
+    assertEquals(Errors.NONE.code, setResponse.data.errorCode)
+
+    // Verify the plugin has the topology
+    val plugin = InMemoryTopologyDescriptionPlugin.instances().asScala
+      .find(_.storedTopology(groupId).isPresent)
+    assertTrue(plugin.isDefined, "Expected plugin to have the topology after 
set")
+
+    // Then, delete the topology by sending null
+    val deleteResponse = sendUpdateTopologyDescription(groupId, 0, null)
+    assertEquals(Errors.NONE.code, deleteResponse.data.errorCode)
+
+    // Verify the plugin no longer has the topology
+    assertFalse(plugin.get.storedTopology(groupId).isPresent,
+      "Expected plugin to not have the topology after delete")

Review Comment:
   `testUpdateTopologyDescriptionDeleteTopology` assumes that sending 
`topologyDescription=null` performs a delete and returns `Errors.NONE`, but the 
broker-side handler currently rejects null topology descriptions 
(`TopologyDescription can't be null.`). As written, this test should fail (and 
it also doesn’t match the current RPC implementation, which only invokes 
`setTopology`, not `deleteTopology`).



##########
tests/kafkatest/tests/streams/base_streams_test.py:
##########
@@ -42,7 +42,8 @@ def __init__(self, test_context, topics, num_controllers=1, 
num_brokers=3):
             use_streams_groups=True,
             server_prop_overrides=[
                 [ "group.streams.min.session.timeout.ms", "10000" ], # Need to 
up the lower bound
-                [ "group.streams.session.timeout.ms", "10000" ] # As in 
classic groups, set this to 10s
+                [ "group.streams.session.timeout.ms", "10000" ], # As in 
classic groups, set this to 10s
+                [ "group.streams.topology.description.plugin.class", 
"org.apache.kafka.server.streams.InMemoryTopologyDescriptionPlugin" ]
             ]

Review Comment:
   `BaseStreamsTest` is a shared base class for multiple ducktape Streams test 
suites. Enabling the in-memory topology description plugin here turns the 
feature on for all those tests, which can introduce unrelated background 
activity/cleanup-cycle behavior and make failures harder to attribute. Consider 
enabling the plugin only in the dedicated topology-description tests (or via a 
per-test override) instead of in the global base fixture.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to