wcarlson5 commented on a change in pull request #11479: URL: https://github.com/apache/kafka/pull/11479#discussion_r756321051
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ########## @@ -108,6 +124,44 @@ private void unlock() { version.topologyLock.unlock(); } + public Collection<String> sourceTopicsForTopology(final String name) { + return builders.get(name).sourceTopicCollection(); + } + + public boolean needsUpdate(final String threadName) { + return threadVersions.get(threadName) < topologyVersion(); + } + + public void registerThread(final String threadName) { + threadVersions.put(threadName, 0L); + } + + public void unregisterThread(final String threadName) { + threadVersions.remove(threadName); + } + + public void reachedLatestVersion(final String threadName) { Review comment: Sure. I will go with `maybeNotifyTopologyVersionWaiters` We can also go with `updateThreadVersions` ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ########## @@ -118,40 +133,104 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) { /** * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running, - * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for + * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for * it to begin processing the new topology. * * @throws IllegalArgumentException if this topology name is already in use * @throws IllegalStateException if streams has not been started or has already shut down * @throws TopologyException if this topology subscribes to any input topics or pattern already in use */ - public void addNamedTopology(final NamedTopology newTopology) { + public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) { + log.debug("Adding topology: {}", newTopology.name()); if (hasStartedOrFinishedShuttingDown()) { throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state); } else if (getTopologyByName(newTopology.name()).isPresent()) { throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() + " as another of the same name already exists"); } - topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder()); + return new AddNamedTopologyResult( + topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder()) + ); } /** * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are - * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure + * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure Review comment: How about I update the javadoc to mention this is not purely async same for add topology? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ########## @@ -108,6 +124,44 @@ private void unlock() { version.topologyLock.unlock(); } + public Collection<String> sourceTopicsForTopology(final String name) { + return builders.get(name).sourceTopicCollection(); + } + + public boolean needsUpdate(final String threadName) { + return threadVersions.get(threadName) < topologyVersion(); + } + + public void registerThread(final String threadName) { + threadVersions.put(threadName, 0L); + } + + public void unregisterThread(final String threadName) { + threadVersions.remove(threadName); + } + + public void reachedLatestVersion(final String threadName) { + try { + lock(); + final Iterator<TopologyVersionWaiters> iterator = version.activeTopologyWaiters.listIterator(); Review comment: Sure. We can talk about it more on that ticket: https://confluentinc.atlassian.net/browse/KCI-1250 ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ########## @@ -908,14 +907,16 @@ private void initializeAndRestorePhase() { // Check if the topology has been updated since we last checked, ie via #addNamedTopology or #removeNamedTopology private void checkForTopologyUpdates() { - if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() || topologyMetadata.isEmpty()) { - lastSeenTopologyVersion = topologyMetadata.topologyVersion(); + if (topologyMetadata.isEmpty() || topologyMetadata.needsUpdate(getName())) { taskManager.handleTopologyUpdates(); + log.info("StreamThread has detected an update to the topology, triggering a rebalance to refresh the assignment"); + if (topologyMetadata.isEmpty()) { Review comment: I gave it a shot but it causes test to fail when removing a topology that is not that last one. I will leave a comment ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ########## @@ -118,40 +133,104 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) { /** * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running, - * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for + * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for * it to begin processing the new topology. * * @throws IllegalArgumentException if this topology name is already in use * @throws IllegalStateException if streams has not been started or has already shut down * @throws TopologyException if this topology subscribes to any input topics or pattern already in use */ - public void addNamedTopology(final NamedTopology newTopology) { + public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) { + log.debug("Adding topology: {}", newTopology.name()); if (hasStartedOrFinishedShuttingDown()) { throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state); } else if (getTopologyByName(newTopology.name()).isPresent()) { throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() + " as another of the same name already exists"); } - topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder()); + return new AddNamedTopologyResult( + topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder()) + ); } /** * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are - * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure + * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure * it stops processing the old topology. * + * @param topologyToRemove name of the topology to be removed + * @param resetOffsets whether to reset the committed offsets for any source topics + * * @throws IllegalArgumentException if this topology name cannot be found * @throws IllegalStateException if streams has not been started or has already shut down * @throws TopologyException if this topology subscribes to any input topics or pattern already in use */ - public void removeNamedTopology(final String topologyToRemove) { + public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemove, final boolean resetOffsets) { + log.debug("Removing topology: {}", topologyToRemove); if (!isRunningOrRebalancing()) { throw new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state); } else if (!getTopologyByName(topologyToRemove).isPresent()) { throw new IllegalArgumentException("Unable to locate for removal a NamedTopology called " + topologyToRemove); } + final Set<TopicPartition> partitionsToReset = metadataForLocalThreads() + .stream() + .flatMap(t -> { + final Set<TaskMetadata> tasks = new HashSet<>(t.activeTasks()); + return tasks.stream(); + }) + .flatMap(t -> t.topicPartitions().stream()) + .filter(t -> topologyMetadata.sourceTopicsForTopology(topologyToRemove).contains(t.topic())) + .collect(Collectors.toSet()); + + final KafkaFuture<Void> removeTopologyFuture = topologyMetadata.unregisterTopology(topologyToRemove); - topologyMetadata.unregisterTopology(topologyToRemove); + if (resetOffsets) { + final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>(); + log.info("partitions to reset: {}", partitionsToReset); + if (!partitionsToReset.isEmpty()) { + removeTopologyFuture.whenComplete((v, throwable) -> { + if (throwable != null) { + future.completeExceptionally(throwable); + } + DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null; + while (deleteOffsetsResult == null) { Review comment: thats a good point. I will add a check for that. That does work. in adding that check I found that something interesting. You can't block on one client while never removing the topology on the other, then the rebalance will keep failing and it will block forever. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ########## @@ -118,40 +133,104 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) { /** * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running, - * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for + * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for * it to begin processing the new topology. * * @throws IllegalArgumentException if this topology name is already in use * @throws IllegalStateException if streams has not been started or has already shut down * @throws TopologyException if this topology subscribes to any input topics or pattern already in use */ - public void addNamedTopology(final NamedTopology newTopology) { + public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) { + log.debug("Adding topology: {}", newTopology.name()); if (hasStartedOrFinishedShuttingDown()) { throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state); } else if (getTopologyByName(newTopology.name()).isPresent()) { throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() + " as another of the same name already exists"); } - topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder()); + return new AddNamedTopologyResult( + topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder()) + ); } /** * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are - * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure + * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure Review comment: I am not sure I understand? If we don't call get on the result it breaks the tests. pretty much blocking is not enough. It might not be purely async but it for sure not blocking. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org