mjsax commented on code in PR #20326:
URL: https://github.com/apache/kafka/pull/20326#discussion_r2512431803


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -461,121 +466,155 @@ public Set<String> makeReady(final Map<String, 
InternalTopicConfig> topics) {
         // have existed with the expected number of partitions, or some create 
topic returns fatal errors.
         log.debug("Starting to validate internal topics {} in partition 
assignor.", topics);
 
-        long currentWallClockMs = time.milliseconds();
+        final long currentWallClockMs = time.milliseconds();
         final long deadlineMs = currentWallClockMs + retryTimeoutMs;
 
-        Set<String> topicsNotReady = new HashSet<>(topics.keySet());
+        final Set<String> topicsNotReady = new HashSet<>(topics.keySet());
         final Set<String> newlyCreatedTopics = new HashSet<>();
 
         while (!topicsNotReady.isEmpty()) {
             final Set<String> tempUnknownTopics = new HashSet<>();
-            topicsNotReady = validateTopics(topicsNotReady, topics, 
tempUnknownTopics);
-            newlyCreatedTopics.addAll(topicsNotReady);
 
+            // Only consider the not-ready subset on each iteration
+            final Map<String, InternalTopicConfig> notReadyTopicsMap = 
topics.entrySet().stream()
+                    .filter(e -> topicsNotReady.contains(e.getKey()))
+                    .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+            final Set<NewTopic> topicsToCreate = 
computeTopicsToCreate(notReadyTopicsMap, tempUnknownTopics);

Review Comment:
   Here we compute `topicToCreate`, but for all existing topics, we don't 
return them, but we also don't remove existing topics from `topicsNotReady`.
   
   I think, we need to add a step 
`topicsNotReady.retainAll(notReadyTopicsMap.keySet())` after this line, plus 
modify `notReadyTopicsMap` inside `computeTopicsToCreate` and remove all 
entries for the topics we successfully verify.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -461,121 +466,155 @@ public Set<String> makeReady(final Map<String, 
InternalTopicConfig> topics) {
         // have existed with the expected number of partitions, or some create 
topic returns fatal errors.
         log.debug("Starting to validate internal topics {} in partition 
assignor.", topics);
 
-        long currentWallClockMs = time.milliseconds();
+        final long currentWallClockMs = time.milliseconds();
         final long deadlineMs = currentWallClockMs + retryTimeoutMs;
 
-        Set<String> topicsNotReady = new HashSet<>(topics.keySet());
+        final Set<String> topicsNotReady = new HashSet<>(topics.keySet());
         final Set<String> newlyCreatedTopics = new HashSet<>();
 
         while (!topicsNotReady.isEmpty()) {
             final Set<String> tempUnknownTopics = new HashSet<>();
-            topicsNotReady = validateTopics(topicsNotReady, topics, 
tempUnknownTopics);

Review Comment:
   Cf my other commend on the test. Here, he old code update `topicNotReady` 
and the new map only contains `topic_2` after the first validation try. -- 
Below in the new code, we do not make this update though.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java:
##########
@@ -782,16 +782,18 @@ public void shouldCompleteTopicValidationOnRetry() {
         // let the first describe succeed on topic, and fail on topic2, and 
then let creation throws topics-existed;
         // it should retry with just topic2 and then let it succeed
         when(admin.describeTopics(Set.of(topic1, topic2)))
-            .thenAnswer(answer -> new MockDescribeTopicsResult(mkMap(
-                mkEntry(topic1, topicDescriptionSuccessFuture),
-                mkEntry(topic2, topicDescriptionFailFuture)
-            )));
+                .thenAnswer(answer -> new MockDescribeTopicsResult(mkMap(
+                        mkEntry(topic1, topicDescriptionSuccessFuture),
+                        mkEntry(topic2, topicDescriptionFailFuture) // first 
call: missing
+                )))
+                .thenAnswer(answer -> new MockDescribeTopicsResult(mkMap(
+                        mkEntry(topic1, topicDescriptionSuccessFuture),

Review Comment:
   I was looking into this with a debugger, and it seems the current rewrite 
introduces a small regression (making this change necessary). In the old code, 
for both topics we query the number of partitions, and get an error for topic_2 
-- but because we can get information for topic_1, we make it "as ready" (cf my 
other comment). In the new code, we "forget" to mark `topic_1` and ready, and 
thus, on retry for `topic_2`, we query the number of partitions for both topics 
again (while it would not be necessary to query `topic_1` a second time).



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java:
##########
@@ -782,16 +782,18 @@ public void shouldCompleteTopicValidationOnRetry() {
         // let the first describe succeed on topic, and fail on topic2, and 
then let creation throws topics-existed;
         // it should retry with just topic2 and then let it succeed
         when(admin.describeTopics(Set.of(topic1, topic2)))
-            .thenAnswer(answer -> new MockDescribeTopicsResult(mkMap(
-                mkEntry(topic1, topicDescriptionSuccessFuture),
-                mkEntry(topic2, topicDescriptionFailFuture)
-            )));
+                .thenAnswer(answer -> new MockDescribeTopicsResult(mkMap(
+                        mkEntry(topic1, topicDescriptionSuccessFuture),
+                        mkEntry(topic2, topicDescriptionFailFuture) // first 
call: missing
+                )))
+                .thenAnswer(answer -> new MockDescribeTopicsResult(mkMap(
+                        mkEntry(topic1, topicDescriptionSuccessFuture),
+                        mkEntry(topic2, topicDescriptionSuccessFuture) // 
second call: now exists
+                )));
         when(admin.createTopics(Collections.singleton(new NewTopic(topic2, 
Optional.of(1), Optional.of((short) 1))
             .configs(mkMap(mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_COMPACT),
                                  
mkEntry(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime"))))))
             .thenAnswer(answer -> new 
MockCreateTopicsResult(Collections.singletonMap(topic2, topicCreationFuture)));
-        when(admin.describeTopics(Collections.singleton(topic2)))
-            .thenAnswer(answer -> new 
MockDescribeTopicsResult(Collections.singletonMap(topic2, 
topicDescriptionSuccessFuture)));

Review Comment:
   We should remove the code change from above, and keep this one, because on 
retry, we should only query for `topic2`.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -673,11 +708,11 @@ private Set<String> validateTopics(final Set<String> 
topicsToValidate,
                     throw new StreamsException(errorMsg);
                 }

Review Comment:
   If the `if` above does not pass, ie, the number of partitions is as expected 
and we don't throw an error, we can remove the move the current topic from 
`topicsMap`, as we successfully remove it. This is the side effect which avoid 
that we query for the same topic multiple times.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to