jeffkbkim commented on code in PR #17700: URL: https://github.com/apache/kafka/pull/17700#discussion_r1917124898
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ########## @@ -456,6 +457,36 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() return fetchable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build())); } + private Set<Integer> nodesWithBufferedPartitions(Set<TopicPartition> buffered, long currentTimeMs) { + Set<Integer> nodesWithBufferedPartitions = new HashSet<>(); + + for (TopicPartition partition : buffered) { + if (!subscriptions.isAssigned(partition)) { + // It's possible that a partition with buffered data from a previous request is now no longer + // assigned to the consumer, in which case just skip this partition. + continue; + } + + SubscriptionState.FetchPosition position = subscriptions.position(partition); Review Comment: In nodesWithBufferedPartitions, we skip if position is null or if the leader is empty. Previously, we would throw illegal state exception (L409) or log and request a metadata update for buffered partitions. Is there a reason for this divergence of this, or should we merge the two logic together? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ########## @@ -456,6 +457,36 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() return fetchable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build())); } + private Set<Integer> nodesWithBufferedPartitions(Set<TopicPartition> buffered, long currentTimeMs) { + Set<Integer> nodesWithBufferedPartitions = new HashSet<>(); + + for (TopicPartition partition : buffered) { + if (!subscriptions.isAssigned(partition)) { Review Comment: Can we add test cases for all of the partitions we skip? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ########## @@ -408,7 +388,21 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() long currentTimeMs = time.milliseconds(); Map<String, Uuid> topicIds = metadata.topicIds(); - for (TopicPartition partition : fetchablePartitions()) { + // This is the set of partitions that have buffered data + Set<TopicPartition> buffered = Collections.unmodifiableSet(fetchBuffer.bufferedPartitions()); + + // This is the set of partitions that do not have buffered data + Set<TopicPartition> unbuffered = Set.copyOf(subscriptions.fetchablePartitions(tp -> !buffered.contains(tp))); + + if (unbuffered.isEmpty()) { + // If there are no partitions that don't already have data locally buffered, there's no need to issue + // any fetch requests at the present time. + return Collections.emptyMap(); Review Comment: to confirm, after we return from this prepareFetchRequests would be invoked in the next poll()? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java: ########## @@ -3757,28 +3764,6 @@ private FetchResponse fetchResponse(TopicIdPartition tp, MemoryRecords records, return FetchResponse.of(Errors.NONE, throttleTime, INVALID_SESSION_ID, new LinkedHashMap<>(partitions)); } - private FetchResponse fetchResponse2(TopicIdPartition tp1, MemoryRecords records1, long hw1, - TopicIdPartition tp2, MemoryRecords records2, long hw2) { - Map<TopicIdPartition, FetchResponseData.PartitionData> partitions = new HashMap<>(); - partitions.put(tp1, - new FetchResponseData.PartitionData() - .setPartitionIndex(tp1.topicPartition().partition()) - .setErrorCode(Errors.NONE.code()) - .setHighWatermark(hw1) - .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET) - .setLogStartOffset(0) - .setRecords(records1)); - partitions.put(tp2, - new FetchResponseData.PartitionData() - .setPartitionIndex(tp2.topicPartition().partition()) - .setErrorCode(Errors.NONE.code()) - .setHighWatermark(hw2) - .setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET) - .setLogStartOffset(0) - .setRecords(records2)); - return FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, new LinkedHashMap<>(partitions)); - } - /** * Assert that the {@link Fetcher#collectFetch() latest fetch} does not contain any * {@link Fetch#records() user-visible records}, did not Review Comment: nit: prepareOffsetsForLeaderEpochResponse is also not used L3623. can we remove it as well? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ########## @@ -408,7 +388,21 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() long currentTimeMs = time.milliseconds(); Map<String, Uuid> topicIds = metadata.topicIds(); - for (TopicPartition partition : fetchablePartitions()) { + // This is the set of partitions that have buffered data + Set<TopicPartition> buffered = Collections.unmodifiableSet(fetchBuffer.bufferedPartitions()); + + // This is the set of partitions that do not have buffered data + Set<TopicPartition> unbuffered = Set.copyOf(subscriptions.fetchablePartitions(tp -> !buffered.contains(tp))); + + if (unbuffered.isEmpty()) { + // If there are no partitions that don't already have data locally buffered, there's no need to issue + // any fetch requests at the present time. + return Collections.emptyMap(); + } + + Set<Integer> nodesWithBufferedPartitions = nodesWithBufferedPartitions(buffered, currentTimeMs); Review Comment: Makes sense. can you create one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org