mjsax commented on code in PR #19188:
URL: https://github.com/apache/kafka/pull/19188#discussion_r2032155035


##########
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java:
##########
@@ -294,13 +306,17 @@ public synchronized ConsumerRecords<K, V> poll(final 
Duration timeout) {
                                 rec.offset() + 1, rec.leaderEpoch(), 
leaderAndEpoch);
                         subscriptions.position(entry.getKey(), newPosition);
                         nextOffsetAndMetadata.put(entry.getKey(), new 
OffsetAndMetadata(rec.offset() + 1, rec.leaderEpoch(), ""));
+                        numPollRecords++;
+                        recIterator.remove();
                     }
                 }
-                toClear.add(entry.getKey());
+
+                if (entry.getValue().isEmpty()) {

Review Comment:
   ```suggestion
                   if (recIterator.isEmpty()) {
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java:
##########
@@ -275,14 +279,22 @@ public synchronized ConsumerRecords<K, V> poll(final 
Duration timeout) {
         // update the consumed offset
         final Map<TopicPartition, List<ConsumerRecord<K, V>>> results = new 
HashMap<>();
         final Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadata = 
new HashMap<>();
-        final List<TopicPartition> toClear = new ArrayList<>();
+        long numPollRecords = 0L;
+
+        final Iterator<Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>>> 
partitionsIter = this.records.entrySet().iterator();
+        while (partitionsIter.hasNext() && numPollRecords < 
this.maxPollRecords) {
+            Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry = 
partitionsIter.next();
 
-        for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : 
this.records.entrySet()) {
             if (!subscriptions.isPaused(entry.getKey())) {
-                final List<ConsumerRecord<K, V>> recs = entry.getValue();
-                for (final ConsumerRecord<K, V> rec : recs) {
+                final ListIterator<ConsumerRecord<K, V>> recIterator = 
entry.getValue().listIterator();

Review Comment:
   Any particular reason why we use `ListIterator`, and not just `Iterator`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to