vvcephei commented on a change in pull request #9836: URL: https://github.com/apache/kafka/pull/9836#discussion_r561979593
########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ########## @@ -637,20 +636,32 @@ private ListOffsetResult fetchOffsetsByTimes(Map<TopicPartition, Long> timestamp } else { List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineFetch, recordsRemaining); - if (!records.isEmpty()) { - TopicPartition partition = nextInLineFetch.partition; - List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition); - if (currentRecords == null) { - fetched.put(partition, records); - } else { - // this case shouldn't usually happen because we only send one fetch at a time per partition, - // but it might conceivably happen in some rare cases (such as partition leader changes). - // we have to copy to a new list because the old one may be immutable - List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size()); - newRecords.addAll(currentRecords); - newRecords.addAll(records); - fetched.put(partition, newRecords); + TopicPartition partition = nextInLineFetch.partition; + + if (subscriptions.isAssigned(partition)) { Review comment: I copied this check from fetchRecords, which says "this can happen when a rebalance happened before fetched records are returned to the consumer's poll call". I.e., it seems like it can actually happen, but a comment is called for, at least. I'll add it. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org