ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r682231789



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -105,31 +105,76 @@ private KafkaStreamsNamedTopologyWrapper(final 
Collection<NamedTopology> topolog
                     (v1, v2) -> {
                         throw new IllegalArgumentException("Topology names 
must be unique");
                     },
-                    () -> new TreeMap<>())),
+                    () -> new ConcurrentSkipListMap<>())),
                 config),
             config,
             clientSupplier
         );
-        for (final NamedTopology topology : topologies) {
-            nameToTopology.put(topology.name(), topology);
-        }
     }
 
-    public NamedTopology getTopologyByName(final String name) {
-        if (nameToTopology.containsKey(name)) {
-            return nameToTopology.get(name);
-        } else {
-            throw new IllegalArgumentException("Unable to locate a 
NamedTopology called " + name);
+    /**
+     * @return the NamedTopology for the specific name, or Optional.empty() if 
the application has no NamedTopology of that name
+     */
+    public Optional<NamedTopology> getTopologyByName(final String name) {
+        return 
Optional.ofNullable(topologyMetadata.getBuilderForTopologyName(name)).map(InternalTopologyBuilder::namedTopology);
+    }
+
+    /**
+     * Add a new NamedTopology to a running Kafka Streams app. If multiple 
instances of the application are running,
+     * you should inform all of them by calling {@link 
#addNamedTopology(NamedTopology)} on each client in order for
+     * it to begin processing the new topology.
+     *
+     * @throws IllegalArgumentException if this topology name is already in use
+     * @throws IllegalStateException    if streams has not been started or has 
already shut down
+     * @throws TopologyException        if this topology subscribes to any 
input topics or pattern already in use
+     */
+    public void addNamedTopology(final NamedTopology newTopology) {
+        if (hasStartedOrFinishedShuttingDown()) {
+            throw new IllegalStateException("Cannot add a NamedTopology while 
the state is " + super.state);
+        } else if (getTopologyByName(newTopology.name()).isPresent()) {
+            throw new IllegalArgumentException("Unable to add the new 
NamedTopology " + newTopology.name() +
+                                                   " as another of the same 
name already exists");
         }
+        
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
     }
 
-    public void addNamedTopology(final NamedTopology topology) {
-        nameToTopology.put(topology.name(), topology);
-        throw new UnsupportedOperationException();
+    /**
+     * Remove an existing NamedTopology from a running Kafka Streams app. If 
multiple instances of the application are
+     * running, you should inform all of them by calling {@link 
#removeNamedTopology(String)} on each client to ensure
+     * it stops processing the old topology.
+     *
+     * @throws IllegalArgumentException if this topology name cannot be found
+     * @throws IllegalStateException    if streams has not been started or has 
already shut down
+     * @throws TopologyException        if this topology subscribes to any 
input topics or pattern already in use
+     */
+    public void removeNamedTopology(final String topologyToRemove) {
+        if (!isRunningOrRebalancing()) {
+            throw new IllegalStateException("Cannot remove a NamedTopology 
while the state is " + super.state);
+        } else if (!getTopologyByName(topologyToRemove).isPresent()) {
+            throw new IllegalArgumentException("Unable to locate for removal a 
NamedTopology called " + topologyToRemove);
+        }
+
+        topologyMetadata.unregisterTopology(topologyToRemove);
     }
 
-    public void removeNamedTopology(final String namedTopology) {
-        throw new UnsupportedOperationException();
+    /**
+     * Do a clean up of the local state directory for this NamedTopology by 
deleting all data with regard to the
+     * @link StreamsConfig#APPLICATION_ID_CONFIG application ID} in the 
({@link StreamsConfig#STATE_DIR_CONFIG})
+     * <p>
+     * May be called while the Streams is in any state, but only on a {@link 
NamedTopology} that has already been
+     * removed via {@link #removeNamedTopology(String)}.
+     * <p>
+     * Calling this method triggers a restore of local {@link StateStore}s for 
this {@link NamedTopology} if it is
+     * ever re-added via {@link #addNamedTopology(NamedTopology)}.
+     *
+     * @throws IllegalStateException if this {@code NamedTopology} hasn't been 
removed
+     * @throws StreamsException if cleanup failed
+     */
+    public void cleanUpNamedTopology(final String name) {
+        if (getTopologyByName(name).isPresent()) {
+            throw new IllegalStateException("Can't clean up local state for an 
active NamedTopology");

Review comment:
       Yes, that's how it should be - you need to remove the named topology 
before cleaning up its state. Just like you can't call `KafkaStreams#cleanUp` 
while the application is running.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to