chickenchickenlove commented on code in PR #15573: URL: https://github.com/apache/kafka/pull/15573#discussion_r1542822737
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java: ########## @@ -523,7 +523,17 @@ private RepartitionTopics prepareRepartitionTopics(final Cluster metadata) { final boolean isMissingInputTopics = !repartitionTopics.missingSourceTopicExceptions().isEmpty(); if (isMissingInputTopics) { if (!taskManager.topologyMetadata().hasNamedTopologies()) { - throw new MissingSourceTopicException("Missing source topics."); Review Comment: <img width="948" alt="image" src="https://github.com/apache/kafka/assets/90125071/c00ef63e-c20b-4edf-9795-7b8b2b314975"> I draw image above to describe `idea for improve`. If you want to read `MissingSourceTopics` in StreamRebalanceListener as well, this workaround seems like it could be a way to handle it. 1. As you know, `ThreadLocal` provides storage specific to each thread. 2. Both `StreamThread`, `StreamRebalanceListener`, `StreamPartitionAssignor` are included on internal package. that means, it is not public API. 3. `StreamRebalanceListener` has reference of `StreamThread` Already. Thus, all the things that we should do, are 3step. 1. Add `ThreadLocal` to field of `StreamThread`. 2. Add method that put `missing source topics` to `ThreadLocal` before throw `MissingSourceTopicExceptions`, 3. Add some codes on `StreamRebalanceListener` to get `MissingSourceTopics` from `ThreadLocal`. Does it make sense to you? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org