ableegoldman commented on a change in pull request #9648:
URL: https://github.com/apache/kafka/pull/9648#discussion_r529104011



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
##########
@@ -149,24 +149,30 @@ public boolean hasPersistentGlobalStore() {
         return false;
     }
 
-    public void updateSourceTopics(final Map<String, List<String>> 
sourceTopicsByName) {
-        if (!sourceTopicsByName.keySet().equals(sourceNodesByName.keySet())) {
-            log.error("Set of source nodes do not match: \n" +
-                "sourceNodesByName = {}\n" +
-                "sourceTopicsByName = {}",
-                sourceNodesByName.keySet(), sourceTopicsByName.keySet());
-            throw new IllegalStateException("Tried to update source topics but 
source nodes did not match");
-        }
+    public void updateSourceTopics(final Map<String, List<String>> 
allSourceTopicsByNodeName) {
         sourceNodesByTopic.clear();
-        for (final Map.Entry<String, List<String>> sourceEntry : 
sourceTopicsByName.entrySet()) {
-            final String nodeName = sourceEntry.getKey();
-            for (final String topic : sourceEntry.getValue()) {
+        for (final Map.Entry<String, SourceNode<?, ?, ?, ?>> sourceNodeEntry : 
sourceNodesByName.entrySet()) {

Review comment:
       In addition to removing the fault check, I slightly refactored this loop 
so that we only loop over the source nodes in this particular subtopology. 
Previously we would have added entries for all source nodes across the entire 
topology to our `sourceNodesByTopic` map




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to