ableegoldman commented on a change in pull request #11857:
URL: https://github.com/apache/kafka/pull/11857#discussion_r820595568
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1129,13 +1129,25 @@ public void updateTaskEndMetadata(final TopicPartition
topicPartition, final Lon
* added NamedTopology and create them if so, then close any tasks whose
named topology no longer exists
*/
void handleTopologyUpdates() {
- tasks.maybeCreateTasksFromNewTopologies();
+ final Set<String> currentNamedTopologies =
topologyMetadata.updateThreadTopologyVersion(Thread.currentThread().getName());
Review comment:
This isn't the main fix, but we were playing a little fast and loose
with the topology version we were reporting having ack'ed -- tightened this up
by first atomically updating the topology version and saving the set of current
named topologies, then doing the actual update handling, and _then_ checking
the listeners and completing any finished add/remove topology requests
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]