abbccdda commented on a change in pull request #9060:
URL: https://github.com/apache/kafka/pull/9060#discussion_r465243578



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##########
@@ -91,18 +100,20 @@ public InternalTopicManager(final Admin adminClient, final 
StreamsConfig streams
      * If a topic exists already but has different number of partitions we 
fail and throw exception requesting user to reset the app before restarting 
again.
      * @return the set of topics which had to be newly created
      */
-    public Set<String> makeReady(final Map<String, InternalTopicConfig> 
topics) {
+    public Set<String> makeReady(final Map<String, InternalTopicConfig> 
topics) throws TaskAssignmentException {

Review comment:
       Comments for thrown exception

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java
##########
@@ -383,18 +405,25 @@ public void 
shouldThrowExceptionWhenKeepsTopicLeaderNotAvailable() {
         EasyMock.expect(admin.describeTopics(Collections.singleton(topic)))
             .andReturn(new MockDescribeTopicsResult(
                 Collections.singletonMap(topic, topicDescriptionFailFuture)))
-            .times(numRetries + 1);
+            .anyTimes();
         EasyMock.expect(admin.createTopics(Collections.emptySet()))
-            .andReturn(new 
MockCreateTopicsResult(Collections.emptyMap())).once();
+            .andReturn(new 
MockCreateTopicsResult(Collections.emptyMap())).anyTimes();
 
         EasyMock.replay(admin);
 
         final InternalTopicConfig internalTopicConfig = new 
RepartitionTopicConfig(topic, Collections.emptyMap());
         internalTopicConfig.setNumberOfPartitions(1);
 
-        assertThrows(
-            StreamsException.class,
-            () -> topicManager.makeReady(Collections.singletonMap(topic, 
internalTopicConfig)));
+        final TaskAssignmentException exception = assertThrows(
+            TaskAssignmentException.class,
+            () -> topicManager.makeReady(Collections.singletonMap(topic, 
internalTopicConfig))
+        );
+        assertNull(exception.getCause());
+        assertThat(
+            exception.getMessage(),
+            equalTo("Could not create topics within 50 milliseconds." +
+                " This can happen if the Kafka cluster is temporary not 
available.")

Review comment:
       s/temporary/temporarily 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -373,8 +373,15 @@ public GroupAssignment assign(final Cluster metadata, 
final GroupSubscription gr
 
         final Set<TaskId> statefulTasks = new HashSet<>();
 
-        final boolean probingRebalanceNeeded =
-            assignTasksToClients(fullMetadata, allSourceTopics, topicGroups, 
clientMetadataMap, partitionsForTask, statefulTasks);
+        final boolean probingRebalanceNeeded;
+        try {
+            probingRebalanceNeeded = assignTasksToClients(fullMetadata, 
allSourceTopics, topicGroups, clientMetadataMap, partitionsForTask, 
statefulTasks);

Review comment:
       Is this part of the initiative to throw a different exception? Could we 
update the summary of this PR?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -477,7 +484,7 @@ private boolean checkMetadataVersions(final int 
minReceivedMetadataVersion,
      * @return map from repartition topic to its partition info
      */
     private Map<TopicPartition, PartitionInfo> prepareRepartitionTopics(final 
Map<Integer, TopicsInfo> topicGroups,
-                                                                           
final Cluster metadata) {
+                                                                           
final Cluster metadata) throws TaskAssignmentException {

Review comment:
       Could we update the meta comment to explain when a 
`TaskAssignmentException` is thrown?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to