Copilot commented on code in PR #22511: URL: https://github.com/apache/kafka/pull/22511#discussion_r3386688916
########## group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/streams/StreamsGroupTopologyDescriptionPlugin.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.api.streams; + +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.concurrent.CompletableFuture; + +/** + * A broker-side plugin that stores, forwards, or exposes topology descriptions pushed + * by Kafka Streams clients. + * + * <p>Implementations must be thread-safe. {@link #setTopology} may be called + * concurrently by multiple members of the same group; calls with the same + * {@code (groupId, topologyEpoch)} carry identical data and must be idempotent. + * {@link #deleteTopology} must also be idempotent — it may be called more than once + * for the same {@code groupId}, including when nothing is stored. + */ [email protected] +public interface StreamsGroupTopologyDescriptionPlugin extends Configurable, AutoCloseable { + + /** + * Store the topology description for a streams group. + * + * <p>The returned future completes when the topology has been persisted or forwarded. + * Failures must be signalled by completing the future exceptionally — implementations + * must not throw synchronously; a synchronous throw is treated as a permanent failure + * with a generic client-visible error message. The completion exception drives + * broker-side behaviour: + * + * <ul> + * <li>{@link StreamsTopologyDescriptionPermanentFailureException} — the description will never be accepted + * at this topology epoch (e.g. too large, semantically rejected). The broker + * ratchets {@code LastFailedTopologyEpoch} and stops re-soliciting until the + * epoch advances.</li> + * <li>{@link StreamsTopologyDescriptionTransientFailureException} or any other exception — treated as + * transient. The broker arms or extends the per-group back-off (30 s → 1 h, + * exponential) and re-solicits on a later heartbeat.</li> + * </ul> + * + * In both cases the caller receives error code + * {@code STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED} with the exception's message in + * {@code ErrorMessage}; the permanent-vs-transient split is broker-internal state. + */ + CompletableFuture<Void> setTopology(String groupId, int topologyEpoch, StreamsGroupTopologyDescription description); + + /** + * Remove any topology description stored for this group. Called when the group is + * deleted or expires. A failure (future completed exceptionally) is reported to the + * caller of {@code DeleteGroups} as {@code GROUP_DELETION_FAILED} with the exception + * message in the per-group {@code ErrorMessage}, and the broker does not tombstone + * the group; a retry of {@code DeleteGroups} re-invokes this method idempotently. + * The periodic-cleanup path treats a failure identically — the group's tombstone is + * deferred to a future cycle. + */ + CompletableFuture<Void> deleteTopology(String groupId); + + /** + * Return the stored topology description for {@code (groupId, topologyEpoch)}, or + * {@code null} if the plugin no longer has the data (e.g. backend wipe). If the future + * completes exceptionally, the broker reports a read error for the group. + */ + CompletableFuture<StreamsGroupTopologyDescription> getTopology(String groupId, int topologyEpoch); +} Review Comment: Because this SPI extends AutoCloseable but does not redeclare `close()`, the inherited signature is `close() throws Exception`, forcing callers to handle a checked exception even though most Kafka plugin SPIs use a no-throws `close()`. Redeclare `void close();` in the interface to narrow the throws clause and make lifecycle management easier for callers. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java: ########## @@ -821,6 +826,90 @@ public void testDLQTopicNamePrefixCustomValue() { assertEquals("my-dlq-", config.errorsDLQTopicNamePrefix()); } + @Test + public void testStreamsGroupTopologyDescriptionPluginDefaultIsNull() { + GroupCoordinatorConfig config = createConfig(new HashMap<>()); + assertNull(config.streamsGroupTopologyDescriptionPlugin(Map.of())); + } + + @Test + public void testStreamsGroupTopologyDescriptionPluginLoadedAndConfigured() { + Map<String, Object> configs = new HashMap<>(); + configs.put(GroupCoordinatorConfig.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_PLUGIN_CLASS_CONFIG, + TestTopologyDescriptionPlugin.class.getName()); + GroupCoordinatorConfig config = createConfig(configs); + + StreamsGroupTopologyDescriptionPlugin plugin = + config.streamsGroupTopologyDescriptionPlugin(Map.of()); + assertInstanceOf(TestTopologyDescriptionPlugin.class, plugin); + assertNotNull(((TestTopologyDescriptionPlugin) plugin).configs); + } + + @Test + public void testStreamsGroupTopologyDescriptionPluginAcceptsClassObject() { + Map<String, Object> configs = new HashMap<>(); + configs.put(GroupCoordinatorConfig.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_PLUGIN_CLASS_CONFIG, + TestTopologyDescriptionPlugin.class); + GroupCoordinatorConfig config = createConfig(configs); + + assertInstanceOf(TestTopologyDescriptionPlugin.class, + config.streamsGroupTopologyDescriptionPlugin(Map.of())); + } Review Comment: This test instantiates a plugin but does not close it. Even though the test plugin’s `close()` is a no-op, closing here ensures the test reflects the intended lifecycle contract and avoids leaks when the test is adapted to real implementations. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java: ########## @@ -821,6 +826,90 @@ public void testDLQTopicNamePrefixCustomValue() { assertEquals("my-dlq-", config.errorsDLQTopicNamePrefix()); } + @Test + public void testStreamsGroupTopologyDescriptionPluginDefaultIsNull() { + GroupCoordinatorConfig config = createConfig(new HashMap<>()); + assertNull(config.streamsGroupTopologyDescriptionPlugin(Map.of())); + } + + @Test + public void testStreamsGroupTopologyDescriptionPluginLoadedAndConfigured() { + Map<String, Object> configs = new HashMap<>(); + configs.put(GroupCoordinatorConfig.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_PLUGIN_CLASS_CONFIG, + TestTopologyDescriptionPlugin.class.getName()); + GroupCoordinatorConfig config = createConfig(configs); + + StreamsGroupTopologyDescriptionPlugin plugin = + config.streamsGroupTopologyDescriptionPlugin(Map.of()); + assertInstanceOf(TestTopologyDescriptionPlugin.class, plugin); + assertNotNull(((TestTopologyDescriptionPlugin) plugin).configs); + } + + @Test + public void testStreamsGroupTopologyDescriptionPluginAcceptsClassObject() { + Map<String, Object> configs = new HashMap<>(); + configs.put(GroupCoordinatorConfig.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_PLUGIN_CLASS_CONFIG, + TestTopologyDescriptionPlugin.class); + GroupCoordinatorConfig config = createConfig(configs); + + assertInstanceOf(TestTopologyDescriptionPlugin.class, + config.streamsGroupTopologyDescriptionPlugin(Map.of())); + } + + @Test + public void testStreamsGroupTopologyDescriptionPluginReceivesAdditionalConfigs() { + Map<String, Object> configs = new HashMap<>(); + configs.put(GroupCoordinatorConfig.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_PLUGIN_CLASS_CONFIG, + TestTopologyDescriptionPlugin.class); + GroupCoordinatorConfig config = createConfig(configs); + + try (TestTopologyDescriptionPlugin plugin = (TestTopologyDescriptionPlugin) + config.streamsGroupTopologyDescriptionPlugin(Map.of("injected.handle", "value"))) { + assertEquals("value", plugin.configs.get("injected.handle")); + } + } + + @Test + public void testStreamsGroupTopologyDescriptionPluginReturnsFreshInstancePerCall() { + Map<String, Object> configs = new HashMap<>(); + configs.put(GroupCoordinatorConfig.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_PLUGIN_CLASS_CONFIG, + TestTopologyDescriptionPlugin.class); + GroupCoordinatorConfig config = createConfig(configs); + + StreamsGroupTopologyDescriptionPlugin first = + config.streamsGroupTopologyDescriptionPlugin(Map.of()); + StreamsGroupTopologyDescriptionPlugin second = + config.streamsGroupTopologyDescriptionPlugin(Map.of()); + assertNotSame(first, second); + } Review Comment: This test creates two plugin instances but does not close either. Since the accessor returns a fresh instance per call, the test should close both instances to avoid leaks for implementations that allocate resources. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java: ########## @@ -821,6 +826,90 @@ public void testDLQTopicNamePrefixCustomValue() { assertEquals("my-dlq-", config.errorsDLQTopicNamePrefix()); } + @Test + public void testStreamsGroupTopologyDescriptionPluginDefaultIsNull() { + GroupCoordinatorConfig config = createConfig(new HashMap<>()); + assertNull(config.streamsGroupTopologyDescriptionPlugin(Map.of())); + } + + @Test + public void testStreamsGroupTopologyDescriptionPluginLoadedAndConfigured() { + Map<String, Object> configs = new HashMap<>(); + configs.put(GroupCoordinatorConfig.STREAMS_GROUP_TOPOLOGY_DESCRIPTION_PLUGIN_CLASS_CONFIG, + TestTopologyDescriptionPlugin.class.getName()); + GroupCoordinatorConfig config = createConfig(configs); + + StreamsGroupTopologyDescriptionPlugin plugin = + config.streamsGroupTopologyDescriptionPlugin(Map.of()); + assertInstanceOf(TestTopologyDescriptionPlugin.class, plugin); + assertNotNull(((TestTopologyDescriptionPlugin) plugin).configs); + } Review Comment: This test creates a plugin instance but never closes it. Since the config API explicitly returns a new plugin instance per call and the caller owns the lifecycle, tests should close the instance to avoid resource leaks when real implementations allocate resources. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
