cadonna commented on code in PR #15573: URL: https://github.com/apache/kafka/pull/15573#discussion_r1537338065
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java: ########## @@ -119,7 +119,8 @@ public Queue<StreamsException> missingSourceTopicExceptions() { return new StreamsException( new MissingSourceTopicException(String.format( "Missing source topics %s for subtopology %d of topology %s", - missingSourceTopics, subtopologyId, topologyName)), + missingSourceTopics, subtopologyId, topologyName), + missingSourceTopics), Review Comment: Would it be possible to return a set of missing source topics from `RepartitionTopics`? ########## streams/src/main/java/org/apache/kafka/streams/errors/MissingSourceTopicException.java: ########## @@ -16,11 +16,25 @@ */ package org.apache.kafka.streams.errors; +import java.util.HashSet; +import java.util.Set; + public class MissingSourceTopicException extends StreamsException { private final static long serialVersionUID = 1L; + private final Set<String> missingTopics; public MissingSourceTopicException(final String message) { super(message); + this.missingTopics = new HashSet<>(); + } + + public MissingSourceTopicException(final String message, final Set<String> missingTopics) { + super(message); + this.missingTopics = missingTopics; + } + + public Set<String> getMissingTopics() { + return this.missingTopics; Review Comment: This class is part of the public API. That means, we cannot change it without a Kafka Improvement Proposal (KIP)[1]. I am not sure if adding the missing source topics to the exception makes too much sense, because the exception is caught in the `StreamsPartitionAssignor` [2] and transformed to a group assignment error (`INCOMPLETE_SOURCE_TOPIC_METADATA`). The missing source topics are not propagated to the point where the actual `MissingSourceTopicException` is thrown to the users, which is in the StreamsRebalanceListener [3]. To achieve this, a protocol change would be needed which I think it is not worth. What we could do instead is log an error message with the missing source topics in `RepartitionsTopics` or `StreamsPartitionAssignor`. I slightly prefer the latter. [1] https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals [2] https://github.com/apache/kafka/blob/d8dd068a626dcab538c2b234ffd8799a94b2f0ed/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java#L449 [3] https://github.com/apache/kafka/blob/4fe4cdc4a61cbac8e070a8b5514403235194015b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java#L58 ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ########## @@ -523,7 +523,17 @@ private RepartitionTopics prepareRepartitionTopics(final Cluster metadata) { final boolean isMissingInputTopics = !repartitionTopics.missingSourceTopicExceptions().isEmpty(); Review Comment: Here you could then use the method on `RepartitionTopcs` I proposed in my other comment. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ########## @@ -523,7 +523,17 @@ private RepartitionTopics prepareRepartitionTopics(final Cluster metadata) { final boolean isMissingInputTopics = !repartitionTopics.missingSourceTopicExceptions().isEmpty(); if (isMissingInputTopics) { if (!taskManager.topologyMetadata().hasNamedTopologies()) { - throw new MissingSourceTopicException("Missing source topics."); Review Comment: In addition to throwing the exception you would also log the error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org