mjsax commented on code in PR #19188: URL: https://github.com/apache/kafka/pull/19188#discussion_r2032155035
########## clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java: ########## @@ -294,13 +306,17 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) { rec.offset() + 1, rec.leaderEpoch(), leaderAndEpoch); subscriptions.position(entry.getKey(), newPosition); nextOffsetAndMetadata.put(entry.getKey(), new OffsetAndMetadata(rec.offset() + 1, rec.leaderEpoch(), "")); + numPollRecords++; + recIterator.remove(); } } - toClear.add(entry.getKey()); + + if (entry.getValue().isEmpty()) { Review Comment: ```suggestion if (recIterator.isEmpty()) { ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java: ########## @@ -275,14 +279,22 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) { // update the consumed offset final Map<TopicPartition, List<ConsumerRecord<K, V>>> results = new HashMap<>(); final Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadata = new HashMap<>(); - final List<TopicPartition> toClear = new ArrayList<>(); + long numPollRecords = 0L; + + final Iterator<Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>>> partitionsIter = this.records.entrySet().iterator(); + while (partitionsIter.hasNext() && numPollRecords < this.maxPollRecords) { + Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry = partitionsIter.next(); - for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : this.records.entrySet()) { if (!subscriptions.isPaused(entry.getKey())) { - final List<ConsumerRecord<K, V>> recs = entry.getValue(); - for (final ConsumerRecord<K, V> rec : recs) { + final ListIterator<ConsumerRecord<K, V>> recIterator = entry.getValue().listIterator(); Review Comment: Any particular reason why we use `ListIterator`, and not just `Iterator`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org