cadonna commented on code in PR #15573:
URL: https://github.com/apache/kafka/pull/15573#discussion_r1537338065


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java:
##########
@@ -119,7 +119,8 @@ public Queue<StreamsException> 
missingSourceTopicExceptions() {
             return new StreamsException(
                 new MissingSourceTopicException(String.format(
                     "Missing source topics %s for subtopology %d of topology 
%s",
-                    missingSourceTopics, subtopologyId, topologyName)),
+                    missingSourceTopics, subtopologyId, topologyName),
+                    missingSourceTopics),

Review Comment:
   Would it be possible to return a set of missing source topics from 
`RepartitionTopics`?



##########
streams/src/main/java/org/apache/kafka/streams/errors/MissingSourceTopicException.java:
##########
@@ -16,11 +16,25 @@
  */
 package org.apache.kafka.streams.errors;
 
+import java.util.HashSet;
+import java.util.Set;
+
 public class MissingSourceTopicException extends StreamsException {
 
     private final static long serialVersionUID = 1L;
+    private final Set<String> missingTopics;
 
     public MissingSourceTopicException(final String message) {
         super(message);
+        this.missingTopics = new HashSet<>();
+    }
+
+    public MissingSourceTopicException(final String message, final Set<String> 
missingTopics) {
+        super(message);
+        this.missingTopics = missingTopics;
+    }
+
+    public Set<String> getMissingTopics() {
+        return this.missingTopics;

Review Comment:
   This class is part of the public API. That means, we cannot change it 
without a Kafka Improvement Proposal (KIP)[1]. 
   I am not sure if adding the missing source topics to the exception makes too 
much sense, because the exception is caught in the `StreamsPartitionAssignor` 
[2] and transformed to a group assignment error 
(`INCOMPLETE_SOURCE_TOPIC_METADATA`). The missing source topics are not 
propagated to the point where the actual `MissingSourceTopicException` is 
thrown to the users, which is in the StreamsRebalanceListener [3]. To achieve 
this, a protocol change would be needed which I think it is not worth.
   What we could do instead is log an error message with the missing source 
topics in `RepartitionsTopics` or `StreamsPartitionAssignor`. I slightly prefer 
the latter.
   
   [1] 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
   [2] 
https://github.com/apache/kafka/blob/d8dd068a626dcab538c2b234ffd8799a94b2f0ed/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java#L449
   [3] 
https://github.com/apache/kafka/blob/4fe4cdc4a61cbac8e070a8b5514403235194015b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java#L58



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -523,7 +523,17 @@ private RepartitionTopics prepareRepartitionTopics(final 
Cluster metadata) {
         final boolean isMissingInputTopics = 
!repartitionTopics.missingSourceTopicExceptions().isEmpty();

Review Comment:
   Here you could then use the method on `RepartitionTopcs` I proposed in my 
other comment. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -523,7 +523,17 @@ private RepartitionTopics prepareRepartitionTopics(final 
Cluster metadata) {
         final boolean isMissingInputTopics = 
!repartitionTopics.missingSourceTopicExceptions().isEmpty();
         if (isMissingInputTopics) {
             if (!taskManager.topologyMetadata().hasNamedTopologies()) {
-                throw new MissingSourceTopicException("Missing source 
topics.");

Review Comment:
   In addition to throwing the exception you would also log the error. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to