cadonna commented on a change in pull request #11686: URL: https://github.com/apache/kafka/pull/11686#discussion_r789633567
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java ########## @@ -97,49 +101,74 @@ public boolean setup() { } } } + } - return missingUserInputTopicsPerTopology.isEmpty(); + public Set<String> topologiesWithMissingInputTopics() { + return missingInputTopicsBySubtopology.keySet() + .stream() + .map(s -> getTopologyNameOrElseUnnamed(s.namedTopology)) + .collect(Collectors.toSet()); } - public Map<String, Set<String>> missingUserInputTopicsPerTopology() { - return Collections.unmodifiableMap(missingUserInputTopicsPerTopology); + public Queue<StreamsException> missingSourceTopicExceptions() { + return missingInputTopicsBySubtopology.entrySet().stream().map(entry -> { + final Set<String> missingSourceTopics = entry.getValue(); + final int subtopologyId = entry.getKey().nodeGroupId; + final String topologyName = entry.getKey().namedTopology; + + return new StreamsException( + new MissingSourceTopicException(String.format( + "Missing source topics %s for subtopology %s of topology %s", + missingSourceTopics, subtopologyId, topologyName)), + new TaskId(subtopologyId, 0, topologyName)); Review comment: Now I see that that information might be useful in the user-specified exception handler. Thank you for the clarification! Maybe we should use `-1` instead of `0` for the partition number, since the partition number doe not contain any information. Just an idea, not necessarily needed for merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org