vvcephei commented on a change in pull request #8787: URL: https://github.com/apache/kafka/pull/8787#discussion_r438477719
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ########## @@ -763,18 +778,36 @@ private boolean populateClientStatesMap(final Map<UUID, ClientState> clientState .flatMap(Collection::stream) .collect(Collectors.toList()); - final Collection<TopicPartition> allPreexistingChangelogPartitions = new ArrayList<>(allChangelogPartitions); - allPreexistingChangelogPartitions.removeIf(partition -> newlyCreatedChangelogs.contains(partition.topic())); + final Set<TopicPartition> preexistingChangelogPartitions = new HashSet<>(); + final Set<TopicPartition> preexistingSourceChangelogPartitions = new HashSet<>(); + final Set<TopicPartition> newlyCreatedChangelogPartitions = new HashSet<>(); + for (final TopicPartition changelog : allChangelogPartitions) { + if (newlyCreatedChangelogs.contains(changelog.topic())) { + newlyCreatedChangelogPartitions.add(changelog); + } else if (optimizedSourceChangelogs.contains(changelog.topic())) { + preexistingSourceChangelogPartitions.add(changelog); + } else { + preexistingChangelogPartitions.add(changelog); + } + } + + // Make the listOffsets request first so it can fetch the offsets for non-source changelogs + // asynchronously while we use the blocking Consumer#committed call to fetch source-changelog offsets + final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> endOffsetsFuture = + fetchEndOffsetsFuture(preexistingChangelogPartitions, adminClient); - final Collection<TopicPartition> allNewlyCreatedChangelogPartitions = new ArrayList<>(allChangelogPartitions); - allNewlyCreatedChangelogPartitions.removeAll(allPreexistingChangelogPartitions); + final Map<TopicPartition, Long> sourceChangelogEndOffsets = + fetchCommittedOffsets(preexistingSourceChangelogPartitions, taskManager.mainConsumer()); - final Map<TopicPartition, ListOffsetsResultInfo> endOffsets = - fetchEndOffsets(allPreexistingChangelogPartitions, adminClient); + final Map<TopicPartition, ListOffsetsResultInfo> endOffsets = ClientUtils.getEndOffsets(endOffsetsFuture); - allTaskEndOffsetSums = computeEndOffsetSumsByTask(endOffsets, changelogsByStatefulTask, allNewlyCreatedChangelogPartitions); + allTaskEndOffsetSums = computeEndOffsetSumsByTask( + changelogsByStatefulTask, + endOffsets, + sourceChangelogEndOffsets, + newlyCreatedChangelogPartitions); fetchEndOffsetsSuccessful = true; - } catch (final StreamsException e) { + } catch (final StreamsException | TimeoutException e) { Review comment: That sounds reasonable, but I think if you throw an exception in the assignor, it just calls the assignor again in a tight loop, which seems worse than backing off and trying again later. If you want to propose this change, maybe you can verify what exactly happens if we throw. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org