kirktrue commented on code in PR #17414:
URL: https://github.com/apache/kafka/pull/17414#discussion_r1794387265


##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java:
##########
@@ -21,23 +21,33 @@
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+
 /**
  * A container that holds the list {@link ConsumerRecord} per partition for a
  * particular topic. There is one {@link ConsumerRecord} list for every topic
  * partition returned by a {@link Consumer#poll(java.time.Duration)} operation.
  */
 public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
-    public static final ConsumerRecords<Object, Object> EMPTY = new 
ConsumerRecords<>(Collections.emptyMap());
+    public static final ConsumerRecords<Object, Object> EMPTY = new 
ConsumerRecords<>(Collections.emptyMap(), Collections.emptyMap());
 
     private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
+    private final Map<TopicPartition, OffsetAndMetadata> nextOffsets;
 
+    @Deprecated
     public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>> 
records) {
         this.records = records;
+        this.nextOffsets = new HashMap<>();

Review Comment:
   Since the `nextOffsets` map isn't modified after use, could this be:
   
   ```suggestion
           this.nextOffsets = Collections.emptyMap();
   ```
   
   Or, the entire constructor could be:
   
   ```java
           this(nextOffsets, Collections.emptyMap());
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java:
##########
@@ -229,6 +229,8 @@ public synchronized ConsumerRecords<K, V> poll(final 
Duration timeout) {
 
         // update the consumed offset
         final Map<TopicPartition, List<ConsumerRecord<K, V>>> results = new 
HashMap<>();
+        final Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadata = 
new HashMap<>();
+

Review Comment:
   Super nit: remove the extra newline:
   
   ```suggestion
           final Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadata = 
new HashMap<>();
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java:
##########
@@ -53,6 +63,13 @@ public List<ConsumerRecord<K, V>> records(TopicPartition 
partition) {
             return Collections.unmodifiableList(recs);
     }
 
+    /**
+     * Get just the next offsets that the consumer will consume

Review Comment:
   In her defense, the comments for the existing `records()` method are 
similarly terse 🤷‍♂️



##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java:
##########
@@ -21,23 +21,33 @@
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+
 /**
  * A container that holds the list {@link ConsumerRecord} per partition for a
  * particular topic. There is one {@link ConsumerRecord} list for every topic
  * partition returned by a {@link Consumer#poll(java.time.Duration)} operation.
  */
 public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
-    public static final ConsumerRecords<Object, Object> EMPTY = new 
ConsumerRecords<>(Collections.emptyMap());
+    public static final ConsumerRecords<Object, Object> EMPTY = new 
ConsumerRecords<>(Collections.emptyMap(), Collections.emptyMap());
 
     private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
+    private final Map<TopicPartition, OffsetAndMetadata> nextOffsets;
 
+    @Deprecated
     public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>> 
records) {
         this.records = records;
+        this.nextOffsets = new HashMap<>();
+    }
+
+    public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>> 
records, final Map<TopicPartition, OffsetAndMetadata> nextOffsets) {
+        this.records = records;
+        this.nextOffsets = nextOffsets;

Review Comment:
   Should this be immutable?
   
   ```suggestion
           this.nextOffsets = Collections.unmodifiableMap(nextOffsets);
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java:
##########
@@ -583,7 +583,7 @@ public synchronized ConsumerRecords<K, V> poll(final 
Duration timeout) {
                 final ShareFetch<K, V> fetch = pollForFetches(timer);
                 if (!fetch.isEmpty()) {
                     currentFetch = fetch;
-                    return new ConsumerRecords<>(fetch.records());
+                    return new ConsumerRecords<>(fetch.records(), 
Collections.emptyMap());

Review Comment:
   Any reason to not leave this line as-is so that it the single-arg 
`ConsumerRecords` is used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to