cadonna commented on a change in pull request #11686:
URL: https://github.com/apache/kafka/pull/11686#discussion_r789633567



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
##########
@@ -97,49 +101,74 @@ public boolean setup() {
                 }
             }
         }
+    }
 
-        return missingUserInputTopicsPerTopology.isEmpty();
+    public Set<String> topologiesWithMissingInputTopics() {
+        return missingInputTopicsBySubtopology.keySet()
+            .stream()
+            .map(s -> getTopologyNameOrElseUnnamed(s.namedTopology))
+            .collect(Collectors.toSet());
     }
 
-    public Map<String, Set<String>> missingUserInputTopicsPerTopology() {
-        return Collections.unmodifiableMap(missingUserInputTopicsPerTopology);
+    public Queue<StreamsException> missingSourceTopicExceptions() {
+        return missingInputTopicsBySubtopology.entrySet().stream().map(entry 
-> {
+            final Set<String> missingSourceTopics = entry.getValue();
+            final int subtopologyId = entry.getKey().nodeGroupId;
+            final String topologyName = entry.getKey().namedTopology;
+
+            return new StreamsException(
+                new MissingSourceTopicException(String.format(
+                    "Missing source topics %s for subtopology %s of topology 
%s",
+                    missingSourceTopics, subtopologyId, topologyName)),
+                new TaskId(subtopologyId, 0, topologyName));

Review comment:
       Now I see that that information might be useful in the user-specified 
exception handler. Thank you for the clarification! Maybe we should use `-1` 
instead of `0` for the partition number, since the partition number doe not 
contain any information. Just an idea, not necessarily needed for merging.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to