Copilot commented on code in PR #22413: URL: https://github.com/apache/kafka/pull/22413#discussion_r3453587430
########## core/src/test/scala/unit/kafka/server/StreamsGroupTopologyDescriptionUpdateRequestTest.scala: ########## @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.utils.TestUtils +import org.apache.kafka.common.message.{StreamsGroupHeartbeatRequestData, StreamsGroupTopologyDescriptionUpdateRequestData} +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.requests.{StreamsGroupTopologyDescriptionUpdateRequest, StreamsGroupTopologyDescriptionUpdateResponse} +import org.apache.kafka.common.test.ClusterInstance +import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig +import org.apache.kafka.server.streams.InMemoryTopologyDescriptionPlugin +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertTrue} + +import java.util +import scala.jdk.CollectionConverters._ + +@ClusterTestDefaults( + types = Array(Type.KRAFT), + serverProperties = Array( + new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), + new ClusterConfigProperty(key = GroupCoordinatorConfig.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_PLUGIN_CLASS_CONFIG, + value = "org.apache.kafka.server.streams.InMemoryTopologyDescriptionPlugin"), + new ClusterConfigProperty(key = GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "0") + ) +) +class StreamsGroupTopologyDescriptionUpdateRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + + private def createStreamsGroup(groupId: String, topicName: String): Unit = { + val topology = new StreamsGroupHeartbeatRequestData.Topology() + .setEpoch(1) + .setSubtopologies(List( + new StreamsGroupHeartbeatRequestData.Subtopology() + .setSubtopologyId("subtopology-0") + .setSourceTopics(List(topicName).asJava) + .setRepartitionSinkTopics(List.empty.asJava) + .setRepartitionSourceTopics(List.empty.asJava) + ).asJava) + + streamsGroupHeartbeat( + groupId = groupId, + memberId = "setup-member", + rebalanceTimeoutMs = 1000, + activeTasks = List.empty, + standbyTasks = List.empty, + warmupTasks = List.empty, + topology = topology, + expectedError = Errors.NONE + ) + } + + private def buildTopologyDescription( + subtopologies: List[StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionSubtopology] = List.empty, + globalStores: List[StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionGlobalStore] = List.empty + ): StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescription = { + new StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescription() + .setSubtopologies(subtopologies.asJava) + .setGlobalStores(globalStores.asJava) + } + + private def sendUpdateTopologyDescription( + groupId: String, + topologyEpoch: Int, + topoDesc: StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescription + ): StreamsGroupTopologyDescriptionUpdateResponse = { + val requestData = new StreamsGroupTopologyDescriptionUpdateRequestData() + .setGroupId(groupId) + .setTopologyEpoch(1).setTopologyDescription(new StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescription()) + .setTopologyDescription(topoDesc) + Review Comment: `sendUpdateTopologyDescription` ignores the `topologyEpoch` parameter (it always sets epoch to 1) and it also sets `topologyDescription` twice, with the second call potentially setting it to `null` (which the broker-side validation rejects). This makes the helper unreliable and breaks the delete test path. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -4567,13 +4604,7 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream return new CoordinatorResult<>( List.of(record), - new StreamsGroupHeartbeatResult( - response, - Map.of(), - group.currentTopologyEpoch(), - group.storedDescriptionTopologyEpoch(), - group.failedDescriptionTopologyEpoch() - ) + new StreamsGroupHeartbeatResult(response, Map.of(), group.currentTopologyEpoch(), -1, -1) ); Review Comment: `streamsGroupStaticMemberGroupLeave` also hard-codes `storedDescriptionTopologyEpoch` / `failedDescriptionTopologyEpoch` to `-1` in the returned `StreamsGroupHeartbeatResult`. This loses the broker-side plugin tracking state in the heartbeat response; it should preserve the group's current values just like other heartbeat/leave paths. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -4492,13 +4535,7 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream } else { log.info("[GroupId {}][MemberId {}] Static member {} with instance id {} left the streams group.", group.groupId(), memberId, memberId, instanceId); - return streamsGroupFenceMember(group, member, new StreamsGroupHeartbeatResult( - response, - Map.of(), - group.currentTopologyEpoch(), - group.storedDescriptionTopologyEpoch(), - group.failedDescriptionTopologyEpoch() - )); + return streamsGroupFenceMember(group, member, new StreamsGroupHeartbeatResult(response, Map.of(), group.currentTopologyEpoch(), -1, -1)); Review Comment: In the static-member leave path, the `StreamsGroupHeartbeatResult` is built with `storedDescriptionTopologyEpoch` / `failedDescriptionTopologyEpoch` hard-coded to `-1`. This diverges from the non-static path above and will incorrectly clear these fields in the heartbeat response, potentially preventing expected re-solicitation / gating behavior on the client side. ########## clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeStreamsGroupsHandler.java: ########## @@ -225,6 +246,78 @@ private StreamsGroupMemberDescription.Endpoint convertEndpoint(final StreamsGrou return new StreamsGroupMemberDescription.Endpoint(endpoint.host(), endpoint.port()); } + private static final byte NODE_TYPE_SOURCE = 1; + private static final byte NODE_TYPE_PROCESSOR = 2; + private static final byte NODE_TYPE_SINK = 3; + + private static StreamsGroupTopologyDescription convertTopologyDescription( + final StreamsGroupDescribeResponseData.TopologyDescription wire) { + final List<StreamsGroupTopologyDescription.Subtopology> subtopologies = new ArrayList<>(wire.subtopologies().size()); + for (StreamsGroupDescribeResponseData.TopologyDescriptionSubtopology sub : wire.subtopologies()) { + subtopologies.add(new StreamsGroupTopologyDescription.Subtopology( + sub.subtopologyId(), + convertNodes(sub.nodes()) + )); + } + final List<StreamsGroupTopologyDescription.GlobalStore> globalStores = new ArrayList<>(wire.globalStores().size()); + for (StreamsGroupDescribeResponseData.TopologyDescriptionGlobalStore gs : wire.globalStores()) { + final StreamsGroupTopologyDescription.Node sourceNode = convertNode(gs.source(), Set.of(), Set.of()); + final StreamsGroupTopologyDescription.Node processorNode = convertNode(gs.processor(), Set.of(), Set.of()); + globalStores.add(new StreamsGroupTopologyDescription.GlobalStore( + (StreamsGroupTopologyDescription.Source) sourceNode, + (StreamsGroupTopologyDescription.Processor) processorNode + )); + } Review Comment: Global store nodes are converted with empty predecessor/successor sets, which discards the wire-provided successor edges (and prevents reconstructing predecessors). This makes the resulting `StreamsGroupTopologyDescription` inconsistent with subtopology conversion and breaks formatting that relies on neighbours. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -786,6 +805,163 @@ private void throwIfStreamsGroupTopologyDescriptionUpdateInvalid( throwIfNull(request.topologyDescription(), "TopologyDescription can't be null."); } + /** + * Schedule the next topology-description cleanup tick on the broker-level {@link Timer}. + * The {@link TimerTask} self-reschedules from inside its own {@code run} so the cycle keeps + * firing every {@code offsets.retention.check.interval.ms} until {@code shutdown} flips + * {@code isActive} to false. No-op when no plugin is configured. + * + * <p>This lifecycle lives on the service rather than on + * {@link StreamsGroupTopologyDescriptionManager} because the cycle drives a runtime chain + * (per-shard read → per-group plugin call → conditional metadata write); per the existing + * separation, the manager exposes plugin-invocation and back-off building blocks and the + * service assembles the chain. + */ + private void scheduleStreamsGroupTopologyCleanupCycle() { + if (!streamsGroupTopologyDescriptionManager.isPluginConfigured()) return; + if (!isActive.get()) return; + TimerTask task = new TimerTask(config.offsetsRetentionCheckIntervalMs()) { + @Override + public void run() { + if (!isActive.get()) return; + try { + runStreamsGroupTopologyCleanupCycle(); + } catch (Throwable t) { + log.warn("Unexpected error running topology-description cleanup cycle.", t); + } + if (isActive.get()) scheduleStreamsGroupTopologyCleanupCycle(); + } + }; + streamsTopologyCleanupTask = task; + timer.add(task); + } + + /** + * Drive one topology-description cleanup cycle: read every shard for streams groups + * eligible for plugin-side cleanup (empty + all offsets expired + storedEpoch != -1), call + * {@code plugin.deleteTopology} for each via the manager, then for every group whose + * plugin call succeeded write a conditional metadata record that clears + * {@code StoredDescriptionTopologyEpoch} only if the persisted value still matches the + * epoch we observed at scan time (so a concurrent {@code setTopology} that has advanced + * the field is preserved). Failed plugin calls retry on the next cycle; the next sweep + * then tombstones the now-empty group. + * + * <p>Single-flight: a cycle that fires while a previous one is still settling per-group + * futures is dropped with a warn-level log. + * + * <p><b>Concurrent setTopology race vs plugin.deleteTopology.</b> {@code plugin.deleteTopology} + * is keyed only on {@code groupId}. If a new member joins between the + * eligibility scan and the cycle's plugin call and pushes a fresh topology, the plugin's + * row is removed regardless of the new epoch — the conditional clear above no-ops on the + * metadata side, but the plugin-side data the member just wrote is gone. A subsequent + * {@code describe} → {@code getTopology} returns null and surfaces {@code NOT_STORED} with + * a warn log; this is the graceful-degradation path KIP-1331 accepts under the label + * "plugin-side data loss". The {@code isEmpty} requirement on the scan keeps the window + * narrow — concurrent setTopology requires a member to join an empty, fully-expired group + * between scan and delete — and the next heartbeat at the same epoch will not re-solicit + * (storedEpoch in metadata still reflects the new push), so the group converges on + * NOT_STORED without churn rather than chasing the lost plugin row. + */ + // Visible for testing. + void runStreamsGroupTopologyCleanupCycle() { + if (!streamsGroupTopologyDescriptionManager.isPluginConfigured()) return; + if (!streamsTopologyCleanupCycleInFlight.compareAndSet(false, true)) { + log.warn("Topology-description cleanup cycle skipped: previous cycle is still in flight."); + return; + } + groupCoordinatorMetrics.recordSensor( + GroupCoordinatorMetrics.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_CLEANUP_CYCLE_RUNS_SENSOR_NAME); + + List<CompletableFuture<Map<String, Integer>>> partitionFutures = runtime.scheduleReadAllOperation( + "list-streams-groups-needing-topology-cleanup", + GroupCoordinatorShard::listStreamsGroupsNeedingTopologyCleanup + ); + + // ConcurrentLinkedQueue because per-partition .handle callbacks can append concurrently + // from whichever thread completed each runtime read. + Queue<CompletableFuture<?>> perGroupFutures = new ConcurrentLinkedQueue<>(); + List<CompletableFuture<Void>> partitionDoneFutures = new ArrayList<>(partitionFutures.size()); + for (CompletableFuture<Map<String, Integer>> partitionFuture : partitionFutures) { + partitionDoneFutures.add(partitionFuture.handle((eligible, throwable) -> { + if (throwable != null) { + log.warn("Topology-description cleanup read failed for one partition.", throwable); + return null; + } + if (eligible == null || eligible.isEmpty()) return null; + groupCoordinatorMetrics.recordSensor( + GroupCoordinatorMetrics.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_CLEANUP_ELIGIBLE_GROUPS_SENSOR_NAME, + eligible.size() + ); + perGroupFutures.add(streamsGroupTopologyDescriptionManager + .invokeDeleteTopologies(eligible.keySet()) + .thenAccept(failures -> { + recordPluginDeleteOutcome(eligible.size(), failures.size()); + eligible.forEach((groupId, expectedStoredEpoch) -> { + // Eligibility required the group to be empty: no member is heartbeating + // anymore, so any push back-off entry is moot regardless of plugin + // outcome — dropping it now avoids carrying broker-wide state forward + // for an id that no live member will solicit on. + streamsGroupTopologyDescriptionManager.clearBackoffGroup(groupId); + if (failures.containsKey(groupId)) { + // Plugin failed: leave stored epoch in place; next cycle retries. + return; + } + clearStoredDescriptionTopologyEpochAsync(groupId, expectedStoredEpoch); + }); + })); + return null; + })); + } + + CompletableFuture.allOf(partitionDoneFutures.toArray(new CompletableFuture<?>[0])) + .thenCompose(__ -> CompletableFuture.allOf(perGroupFutures.toArray(new CompletableFuture<?>[0]))) + .whenComplete((__, throwable) -> { + if (throwable != null) { + log.warn("Topology-description cleanup cycle failed to complete cleanly.", throwable); + } + streamsTopologyCleanupCycleInFlight.set(false); + }); Review Comment: `streamsTopologyCleanupCycleInFlight` is released after waiting for `partitionDoneFutures` and `perGroupFutures`, but the asynchronous `clearStoredDescriptionTopologyEpochAsync(...)` writes are fire-and-forget and are not included in `perGroupFutures`. This means a subsequent cycle can start while conditional-clear writes from the previous cycle are still in flight (and the method-level comment about "settling ... conditional-clear writes" is not currently true). ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupHeartbeatResultTest.java: ########## @@ -74,5 +62,6 @@ public void testCreatableTopicsMapIsImmutable() { public void testNullDataIsRejected() { assertThrows(NullPointerException.class, () -> new StreamsGroupHeartbeatResult(null, Map.of(), -1, -1, -1)); + assertTrue(true); Review Comment: The `assertTrue(true)` at the end of `testNullDataIsRejected` is a no-op and makes the test look like it’s asserting something additional when it isn’t. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -1034,6 +1035,60 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets( .setTopics(topicResponses); } + /** + * Whether the offset committed for {@code (groupId, topic, partition)} is currently + * eligible for expiration: its age exceeds {@code offsets.retention.ms} per the group's + * {@link OffsetExpirationCondition}, and there is no pending transactional offset on the + * partition. Shared between {@link #cleanupExpiredOffsets} (the write path that tombstones + * eligible offsets) and {@link #allOffsetsExpired} (the read-only check used by the + * topology-description cleanup cycle) so the two cannot drift. + */ + private boolean isOffsetExpirable( + String groupId, + String topic, + int partition, + OffsetAndMetadata offsetAndMetadata, + OffsetExpirationCondition condition, + long currentTimestampMs + ) { + return condition.isOffsetExpired(offsetAndMetadata, currentTimestampMs, config.offsetsRetentionMs()) + && !hasPendingTransactionalOffsets(groupId, topic, partition); + } + + /** + * Read-only counterpart to {@link #cleanupExpiredOffsets(String, List)}: returns whether + * every committed offset for the group is currently eligible for expiration and no pending + * transactional offsets remain. Used by the topology-description plugin cleanup cycle on + * the eligibility read side, where the sweep must not mutate any record but still needs to + * decide whether the group is fully expirable before driving a {@code plugin.deleteTopology}. + */ + public boolean allOffsetsExpired(String groupId, long currentTimestampMs) { + TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> offsetsByTopic = + offsets.offsetsByGroup.get(groupId); + if (offsetsByTopic == null) { + return !openTransactions.contains(groupId); + } + Group group = groupMetadataManager.group(groupId); + Optional<OffsetExpirationCondition> offsetExpirationCondition = group.offsetExpirationCondition(); + if (offsetExpirationCondition.isEmpty()) { Review Comment: `allOffsetsExpired` reads both offsets and group metadata at `Long.MAX_VALUE`/"latest" (via `offsetsByGroup.get(groupId)` and `groupMetadataManager.group(groupId)`), even though its main caller (`listStreamsGroupsNeedingTopologyCleanup`) is operating at a specific `committedOffset`. Mixing snapshot reads like this can make the eligibility decision inconsistent under concurrent updates (and `group(groupId)` can throw, forcing the scan to skip the group). Consider passing `committedOffset` into `allOffsetsExpired` and using timeline reads (`offsetsByGroup.get(groupId, committedOffset)` + `groupMetadataManager.maybeGroup(groupId, committedOffset)`) to keep the scan consistent. ########## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java: ########## @@ -411,6 +412,13 @@ private void addDefaultBrokerPropsIfAbsent(final Properties brokerConfig) { brokerConfig.putIfAbsent(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, "1"); brokerConfig.putIfAbsent(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, true); brokerConfig.putIfAbsent(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, true); + final String pluginKey = GroupCoordinatorConfig.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_PLUGIN_CLASS_CONFIG; + if ("".equals(brokerConfig.get(pluginKey))) { + // Empty string is a sentinel meaning "no plugin": remove so the broker sees null default. + brokerConfig.remove(pluginKey); + } else { + brokerConfig.putIfAbsent(pluginKey, InMemoryTopologyDescriptionPlugin.class.getName()); + } Review Comment: `EmbeddedKafkaCluster` now enables `InMemoryTopologyDescriptionPlugin` by default for *all* Streams integration tests (unless callers set the config key to an empty-string sentinel). This changes baseline broker behavior across a large number of tests (including scheduling the periodic topology-cleanup task), which risks introducing unrelated timing/flakiness changes. It’s safer to keep the default as "no plugin" and have only the topology-description tests opt in via `BROKER_CONFIG`. ########## tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java: ########## @@ -262,33 +265,61 @@ private void printGroupInfo(List<GroupListing> groups) { } } - public void describeGroups() throws ExecutionException, InterruptedException { + public boolean describeGroups() throws ExecutionException, InterruptedException { List<String> groupIds = opts.options.has(opts.allGroupsOpt) ? new ArrayList<>(listStreamsGroups()) : new ArrayList<>(opts.options.valuesOf(opts.groupOpt)); + boolean ok = true; if (!groupIds.isEmpty()) { for (String groupId : groupIds) { - StreamsGroupDescription description = getDescribeGroup(groupId); + boolean wantTopology = opts.options.has(opts.topologyOpt); + StreamsGroupDescription description = getDescribeGroup(groupId, wantTopology); boolean verbose = opts.options.has(opts.verboseOpt); if (opts.options.has(opts.membersOpt)) { printMembers(description, verbose); } else if (opts.options.has(opts.stateOpt)) { printStates(description, verbose); + } else if (wantTopology) { + ok &= printTopology(description); } else { printOffsets(description, verbose); } } } + return ok; } StreamsGroupDescription getDescribeGroup(String group) throws ExecutionException, InterruptedException { + return getDescribeGroup(group, false); + } + + StreamsGroupDescription getDescribeGroup(String group, boolean includeTopologyDescription) throws ExecutionException, InterruptedException { DescribeStreamsGroupsResult result = adminClient.describeStreamsGroups( List.of(group), - withTimeoutMs(new DescribeStreamsGroupsOptions())); + withTimeoutMs(new DescribeStreamsGroupsOptions() + .includeTopologyDescription(includeTopologyDescription))); Map<String, StreamsGroupDescription> descriptionMap = result.all().get(); return descriptionMap.get(group); } + private boolean printTopology(StreamsGroupDescription description) { + switch (description.topologyDescriptionStatus()) { + case AVAILABLE: + System.out.println(TopologyDescriptionFormatter.format(description.topologyDescription().get())); + return true; Review Comment: `printTopology` assumes `topologyDescription().get()` is always present when status is `AVAILABLE`. If the broker ever returns an inconsistent response (status AVAILABLE but description null), this will throw `NoSuchElementException` and make the CLI crash instead of printing a helpful message and returning a non-zero status. ########## tools/src/main/java/org/apache/kafka/tools/streams/TopologyDescriptionFormatter.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.streams; + +import org.apache.kafka.clients.admin.StreamsGroupTopologyDescription; + +import java.util.Collection; + +/** + * Formats a {@link StreamsGroupTopologyDescription} using the same layout as + * {@code Topology#describe().toString()} in the Kafka Streams API. This keeps the + * {@code kafka-streams-groups.sh --describe --topology} output familiar to users who + * have seen the client-side topology description before. + */ Review Comment: The Javadoc claims this uses the *same* layout as `Topology#describe().toString()`, but the formatter prints global stores under a separate `Global Stores:` section. The Streams `TopologyDescription#toString()` output interleaves global stores as `Sub-topology: <id> for global store ...`, so this is not the same layout. ########## core/src/test/scala/unit/kafka/server/StreamsGroupTopologyDescriptionUpdateRequestTest.scala: ########## @@ -0,0 +1,331 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.utils.TestUtils +import org.apache.kafka.common.message.{StreamsGroupHeartbeatRequestData, StreamsGroupTopologyDescriptionUpdateRequestData} +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.requests.{StreamsGroupTopologyDescriptionUpdateRequest, StreamsGroupTopologyDescriptionUpdateResponse} +import org.apache.kafka.common.test.ClusterInstance +import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig +import org.apache.kafka.server.streams.InMemoryTopologyDescriptionPlugin +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertTrue} + +import java.util +import scala.jdk.CollectionConverters._ + +@ClusterTestDefaults( + types = Array(Type.KRAFT), + serverProperties = Array( + new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), + new ClusterConfigProperty(key = GroupCoordinatorConfig.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_PLUGIN_CLASS_CONFIG, + value = "org.apache.kafka.server.streams.InMemoryTopologyDescriptionPlugin"), + new ClusterConfigProperty(key = GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "0") + ) +) +class StreamsGroupTopologyDescriptionUpdateRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + + private def createStreamsGroup(groupId: String, topicName: String): Unit = { + val topology = new StreamsGroupHeartbeatRequestData.Topology() + .setEpoch(1) + .setSubtopologies(List( + new StreamsGroupHeartbeatRequestData.Subtopology() + .setSubtopologyId("subtopology-0") + .setSourceTopics(List(topicName).asJava) + .setRepartitionSinkTopics(List.empty.asJava) + .setRepartitionSourceTopics(List.empty.asJava) + ).asJava) + + streamsGroupHeartbeat( + groupId = groupId, + memberId = "setup-member", + rebalanceTimeoutMs = 1000, + activeTasks = List.empty, + standbyTasks = List.empty, + warmupTasks = List.empty, + topology = topology, + expectedError = Errors.NONE + ) + } + + private def buildTopologyDescription( + subtopologies: List[StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionSubtopology] = List.empty, + globalStores: List[StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionGlobalStore] = List.empty + ): StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescription = { + new StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescription() + .setSubtopologies(subtopologies.asJava) + .setGlobalStores(globalStores.asJava) + } + + private def sendUpdateTopologyDescription( + groupId: String, + topologyEpoch: Int, + topoDesc: StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescription + ): StreamsGroupTopologyDescriptionUpdateResponse = { + val requestData = new StreamsGroupTopologyDescriptionUpdateRequestData() + .setGroupId(groupId) + .setTopologyEpoch(1).setTopologyDescription(new StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescription()) + .setTopologyDescription(topoDesc) + + val request = new StreamsGroupTopologyDescriptionUpdateRequest.Builder(requestData) + .build(ApiKeys.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE.latestVersion(isUnstableApiEnabled)) + + connectAndReceive[StreamsGroupTopologyDescriptionUpdateResponse](request) + } + + private def simpleSubtopology(): List[StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionSubtopology] = { + val node = new StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode() + .setName("source-node") + .setNodeType(1.toByte) + .setSourceTopics(util.Arrays.asList("input-topic")) + .setStores(util.Collections.emptyList()) + .setSuccessors(util.Arrays.asList("processor-node")) + + val processorNode = new StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionNode() + .setName("processor-node") + .setNodeType(2.toByte) + .setSourceTopics(util.Collections.emptyList()) + .setStores(util.Collections.emptyList()) + .setSuccessors(util.Collections.emptyList()) + + List(new StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescriptionSubtopology() + .setSubtopologyId("subtopology-0") + .setNodes(util.Arrays.asList(node, processorNode))) + } + + @ClusterTest + def testUpdateTopologyDescriptionWithPlugin(): Unit = { + val groupId = "test-streams-group" + val topicName = "input-topic" + + createOffsetsTopic() + createTopic(topicName, 1) + createStreamsGroup(groupId, topicName) + + val response = sendUpdateTopologyDescription(groupId, 1, buildTopologyDescription(simpleSubtopology())) + assertEquals(Errors.NONE.code, response.data.errorCode, + s"Expected NONE but got ${Errors.forCode(response.data.errorCode)}: ${response.data.errorMessage}") + + // Verify the plugin actually stored the topology + val plugin = InMemoryTopologyDescriptionPlugin.instances().asScala + .find(_.storedTopology(groupId).isPresent) + assertTrue(plugin.isDefined, "Expected at least one plugin instance to have the topology") + assertNotEquals(-1, plugin.get.storedDescriptionTopologyEpoch(groupId)) + val storedTopo = plugin.get.storedTopology(groupId).get() + assertEquals(1, storedTopo.subtopologies.size) + assertEquals("subtopology-0", storedTopo.subtopologies.iterator().next().id()) + } + + @ClusterTest + def testUpdateTopologyDescriptionDeleteTopology(): Unit = { + val groupId = "test-streams-group-delete" + val topicName = "input-topic" + + createOffsetsTopic() + createTopic(topicName, 1) + createStreamsGroup(groupId, topicName) + + // First, set a topology + val setResponse = sendUpdateTopologyDescription(groupId, 1, buildTopologyDescription(simpleSubtopology())) + assertEquals(Errors.NONE.code, setResponse.data.errorCode) + + // Verify the plugin has the topology + val plugin = InMemoryTopologyDescriptionPlugin.instances().asScala + .find(_.storedTopology(groupId).isPresent) + assertTrue(plugin.isDefined, "Expected plugin to have the topology after set") + + // Then, delete the topology by sending null + val deleteResponse = sendUpdateTopologyDescription(groupId, 0, null) + assertEquals(Errors.NONE.code, deleteResponse.data.errorCode) + + // Verify the plugin no longer has the topology + assertFalse(plugin.get.storedTopology(groupId).isPresent, + "Expected plugin to not have the topology after delete") Review Comment: `testUpdateTopologyDescriptionDeleteTopology` assumes that sending `topologyDescription=null` performs a delete and returns `Errors.NONE`, but the broker-side handler currently rejects null topology descriptions (`TopologyDescription can't be null.`). As written, this test should fail (and it also doesn’t match the current RPC implementation, which only invokes `setTopology`, not `deleteTopology`). ########## tests/kafkatest/tests/streams/base_streams_test.py: ########## @@ -42,7 +42,8 @@ def __init__(self, test_context, topics, num_controllers=1, num_brokers=3): use_streams_groups=True, server_prop_overrides=[ [ "group.streams.min.session.timeout.ms", "10000" ], # Need to up the lower bound - [ "group.streams.session.timeout.ms", "10000" ] # As in classic groups, set this to 10s + [ "group.streams.session.timeout.ms", "10000" ], # As in classic groups, set this to 10s + [ "group.streams.topology.description.plugin.class", "org.apache.kafka.server.streams.InMemoryTopologyDescriptionPlugin" ] ] Review Comment: `BaseStreamsTest` is a shared base class for multiple ducktape Streams test suites. Enabling the in-memory topology description plugin here turns the feature on for all those tests, which can introduce unrelated background activity/cleanup-cycle behavior and make failures harder to attribute. Consider enabling the plugin only in the dedicated topology-description tests (or via a per-test override) instead of in the global base fixture. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
