mjsax commented on code in PR #20326:
URL: https://github.com/apache/kafka/pull/20326#discussion_r2512431803
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -461,121 +466,155 @@ public Set<String> makeReady(final Map<String,
InternalTopicConfig> topics) {
// have existed with the expected number of partitions, or some create
topic returns fatal errors.
log.debug("Starting to validate internal topics {} in partition
assignor.", topics);
- long currentWallClockMs = time.milliseconds();
+ final long currentWallClockMs = time.milliseconds();
final long deadlineMs = currentWallClockMs + retryTimeoutMs;
- Set<String> topicsNotReady = new HashSet<>(topics.keySet());
+ final Set<String> topicsNotReady = new HashSet<>(topics.keySet());
final Set<String> newlyCreatedTopics = new HashSet<>();
while (!topicsNotReady.isEmpty()) {
final Set<String> tempUnknownTopics = new HashSet<>();
- topicsNotReady = validateTopics(topicsNotReady, topics,
tempUnknownTopics);
- newlyCreatedTopics.addAll(topicsNotReady);
+ // Only consider the not-ready subset on each iteration
+ final Map<String, InternalTopicConfig> notReadyTopicsMap =
topics.entrySet().stream()
+ .filter(e -> topicsNotReady.contains(e.getKey()))
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+ final Set<NewTopic> topicsToCreate =
computeTopicsToCreate(notReadyTopicsMap, tempUnknownTopics);
Review Comment:
Here we compute `topicToCreate`, but for all existing topics, we don't
return them, but we also don't remove existing topics from `topicsNotReady`.
I think, we need to add a step
`topicsNotReady.retainAll(notReadyTopicsMap.keySet())` after this line, plus
modify `notReadyTopicsMap` inside `computeTopicsToCreate` and remove all
entries for the topics we successfully verify.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -461,121 +466,155 @@ public Set<String> makeReady(final Map<String,
InternalTopicConfig> topics) {
// have existed with the expected number of partitions, or some create
topic returns fatal errors.
log.debug("Starting to validate internal topics {} in partition
assignor.", topics);
- long currentWallClockMs = time.milliseconds();
+ final long currentWallClockMs = time.milliseconds();
final long deadlineMs = currentWallClockMs + retryTimeoutMs;
- Set<String> topicsNotReady = new HashSet<>(topics.keySet());
+ final Set<String> topicsNotReady = new HashSet<>(topics.keySet());
final Set<String> newlyCreatedTopics = new HashSet<>();
while (!topicsNotReady.isEmpty()) {
final Set<String> tempUnknownTopics = new HashSet<>();
- topicsNotReady = validateTopics(topicsNotReady, topics,
tempUnknownTopics);
Review Comment:
Cf my other commend on the test. Here, he old code update `topicNotReady`
and the new map only contains `topic_2` after the first validation try. --
Below in the new code, we do not make this update though.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java:
##########
@@ -782,16 +782,18 @@ public void shouldCompleteTopicValidationOnRetry() {
// let the first describe succeed on topic, and fail on topic2, and
then let creation throws topics-existed;
// it should retry with just topic2 and then let it succeed
when(admin.describeTopics(Set.of(topic1, topic2)))
- .thenAnswer(answer -> new MockDescribeTopicsResult(mkMap(
- mkEntry(topic1, topicDescriptionSuccessFuture),
- mkEntry(topic2, topicDescriptionFailFuture)
- )));
+ .thenAnswer(answer -> new MockDescribeTopicsResult(mkMap(
+ mkEntry(topic1, topicDescriptionSuccessFuture),
+ mkEntry(topic2, topicDescriptionFailFuture) // first
call: missing
+ )))
+ .thenAnswer(answer -> new MockDescribeTopicsResult(mkMap(
+ mkEntry(topic1, topicDescriptionSuccessFuture),
Review Comment:
I was looking into this with a debugger, and it seems the current rewrite
introduces a small regression (making this change necessary). In the old code,
for both topics we query the number of partitions, and get an error for topic_2
-- but because we can get information for topic_1, we make it "as ready" (cf my
other comment). In the new code, we "forget" to mark `topic_1` and ready, and
thus, on retry for `topic_2`, we query the number of partitions for both topics
again (while it would not be necessary to query `topic_1` a second time).
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java:
##########
@@ -782,16 +782,18 @@ public void shouldCompleteTopicValidationOnRetry() {
// let the first describe succeed on topic, and fail on topic2, and
then let creation throws topics-existed;
// it should retry with just topic2 and then let it succeed
when(admin.describeTopics(Set.of(topic1, topic2)))
- .thenAnswer(answer -> new MockDescribeTopicsResult(mkMap(
- mkEntry(topic1, topicDescriptionSuccessFuture),
- mkEntry(topic2, topicDescriptionFailFuture)
- )));
+ .thenAnswer(answer -> new MockDescribeTopicsResult(mkMap(
+ mkEntry(topic1, topicDescriptionSuccessFuture),
+ mkEntry(topic2, topicDescriptionFailFuture) // first
call: missing
+ )))
+ .thenAnswer(answer -> new MockDescribeTopicsResult(mkMap(
+ mkEntry(topic1, topicDescriptionSuccessFuture),
+ mkEntry(topic2, topicDescriptionSuccessFuture) //
second call: now exists
+ )));
when(admin.createTopics(Collections.singleton(new NewTopic(topic2,
Optional.of(1), Optional.of((short) 1))
.configs(mkMap(mkEntry(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_COMPACT),
mkEntry(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime"))))))
.thenAnswer(answer -> new
MockCreateTopicsResult(Collections.singletonMap(topic2, topicCreationFuture)));
- when(admin.describeTopics(Collections.singleton(topic2)))
- .thenAnswer(answer -> new
MockDescribeTopicsResult(Collections.singletonMap(topic2,
topicDescriptionSuccessFuture)));
Review Comment:
We should remove the code change from above, and keep this one, because on
retry, we should only query for `topic2`.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -673,11 +708,11 @@ private Set<String> validateTopics(final Set<String>
topicsToValidate,
throw new StreamsException(errorMsg);
}
Review Comment:
If the `if` above does not pass, ie, the number of partitions is as expected
and we don't throw an error, we can remove the move the current topic from
`topicsMap`, as we successfully remove it. This is the side effect which avoid
that we query for the same topic multiple times.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]