ableegoldman commented on a change in pull request #11857:
URL: https://github.com/apache/kafka/pull/11857#discussion_r820593601



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -881,17 +881,15 @@ private void initializeAndRestorePhase() {
     // Check if the topology has been updated since we last checked, ie via 
#addNamedTopology or #removeNamedTopology
     private void checkForTopologyUpdates() {
         if (topologyMetadata.isEmpty() || 
topologyMetadata.needsUpdate(getName())) {
+            log.info("StreamThread has detected an update to the topology");
+
             taskManager.handleTopologyUpdates();
-            log.info("StreamThread has detected an update to the topology, 
triggering a rebalance to refresh the assignment");
-            if (topologyMetadata.isEmpty()) {
-                mainConsumer.unsubscribe();
-            }
-            
topologyMetadata.maybeNotifyTopologyVersionWaitersAndUpdateThreadsTopologyVersion(getName());

Review comment:
       All of this has been moved into `taskManager#handleTopologyUpdates`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to