ableegoldman commented on a change in pull request #9648: URL: https://github.com/apache/kafka/pull/9648#discussion_r529104011
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java ########## @@ -149,24 +149,30 @@ public boolean hasPersistentGlobalStore() { return false; } - public void updateSourceTopics(final Map<String, List<String>> sourceTopicsByName) { - if (!sourceTopicsByName.keySet().equals(sourceNodesByName.keySet())) { - log.error("Set of source nodes do not match: \n" + - "sourceNodesByName = {}\n" + - "sourceTopicsByName = {}", - sourceNodesByName.keySet(), sourceTopicsByName.keySet()); - throw new IllegalStateException("Tried to update source topics but source nodes did not match"); - } + public void updateSourceTopics(final Map<String, List<String>> allSourceTopicsByNodeName) { sourceNodesByTopic.clear(); - for (final Map.Entry<String, List<String>> sourceEntry : sourceTopicsByName.entrySet()) { - final String nodeName = sourceEntry.getKey(); - for (final String topic : sourceEntry.getValue()) { + for (final Map.Entry<String, SourceNode<?, ?, ?, ?>> sourceNodeEntry : sourceNodesByName.entrySet()) { Review comment: In addition to removing the fault check, I slightly refactored this loop so that we only loop over the source nodes in this particular subtopology. Previously we would have added entries for all source nodes across the entire topology to our `sourceNodesByTopic` map ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org