abbccdda commented on a change in pull request #9060: URL: https://github.com/apache/kafka/pull/9060#discussion_r465243578
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ########## @@ -91,18 +100,20 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams * If a topic exists already but has different number of partitions we fail and throw exception requesting user to reset the app before restarting again. * @return the set of topics which had to be newly created */ - public Set<String> makeReady(final Map<String, InternalTopicConfig> topics) { + public Set<String> makeReady(final Map<String, InternalTopicConfig> topics) throws TaskAssignmentException { Review comment: Comments for thrown exception ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java ########## @@ -383,18 +405,25 @@ public void shouldThrowExceptionWhenKeepsTopicLeaderNotAvailable() { EasyMock.expect(admin.describeTopics(Collections.singleton(topic))) .andReturn(new MockDescribeTopicsResult( Collections.singletonMap(topic, topicDescriptionFailFuture))) - .times(numRetries + 1); + .anyTimes(); EasyMock.expect(admin.createTopics(Collections.emptySet())) - .andReturn(new MockCreateTopicsResult(Collections.emptyMap())).once(); + .andReturn(new MockCreateTopicsResult(Collections.emptyMap())).anyTimes(); EasyMock.replay(admin); final InternalTopicConfig internalTopicConfig = new RepartitionTopicConfig(topic, Collections.emptyMap()); internalTopicConfig.setNumberOfPartitions(1); - assertThrows( - StreamsException.class, - () -> topicManager.makeReady(Collections.singletonMap(topic, internalTopicConfig))); + final TaskAssignmentException exception = assertThrows( + TaskAssignmentException.class, + () -> topicManager.makeReady(Collections.singletonMap(topic, internalTopicConfig)) + ); + assertNull(exception.getCause()); + assertThat( + exception.getMessage(), + equalTo("Could not create topics within 50 milliseconds." + + " This can happen if the Kafka cluster is temporary not available.") Review comment: s/temporary/temporarily ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ########## @@ -373,8 +373,15 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr final Set<TaskId> statefulTasks = new HashSet<>(); - final boolean probingRebalanceNeeded = - assignTasksToClients(fullMetadata, allSourceTopics, topicGroups, clientMetadataMap, partitionsForTask, statefulTasks); + final boolean probingRebalanceNeeded; + try { + probingRebalanceNeeded = assignTasksToClients(fullMetadata, allSourceTopics, topicGroups, clientMetadataMap, partitionsForTask, statefulTasks); Review comment: Is this part of the initiative to throw a different exception? Could we update the summary of this PR? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ########## @@ -477,7 +484,7 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion, * @return map from repartition topic to its partition info */ private Map<TopicPartition, PartitionInfo> prepareRepartitionTopics(final Map<Integer, TopicsInfo> topicGroups, - final Cluster metadata) { + final Cluster metadata) throws TaskAssignmentException { Review comment: Could we update the meta comment to explain when a `TaskAssignmentException` is thrown? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org