lucasbru commented on code in PR #17414:
URL: https://github.com/apache/kafka/pull/17414#discussion_r1796868931


##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java:
##########
@@ -53,6 +60,14 @@ public List<ConsumerRecord<K, V>> records(TopicPartition 
partition) {
             return Collections.unmodifiableList(recs);
     }
 
+    /**
+     * Get the next offsets and metadata corresponding to all partitions 
fetched from the last batch.

Review Comment:
   Why `fetched from the last batch`? When I follow the code, it seems the 
result can include the offsets from multiple batches?
   
   @kirktrue made a fair point that other comments are also not good in this 
class. But for this method specifically, I think it should be clear for which 
topic-partitions we include offsets and for which topic-partitions we don't, as 
this will define what offsets we commit, and committing tends to be important 
in Kafka. So it should be very clear what is the set of offsets returned by 
this method.
   
   I think we can say that we include offsets for all topic-partitions for 
which the position advanced in this poll call. But then, we need to change the 
code a bit (below). 
   
   For me, this is not a minor point.  



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java:
##########
@@ -95,6 +102,13 @@ public int numRecords() {
         return numRecords;
     }
 
+    /**
+     * @return the next offsets and metadata (last epochs is included)

Review Comment:
   nit: `last epoch` not `last epochs`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java:
##########
@@ -32,12 +32,19 @@
  * partition returned by a {@link Consumer#poll(java.time.Duration)} operation.
  */
 public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
-    public static final ConsumerRecords<Object, Object> EMPTY = new 
ConsumerRecords<>(Collections.emptyMap());
+    public static final ConsumerRecords<Object, Object> EMPTY = new 
ConsumerRecords<>(Collections.emptyMap(), Collections.emptyMap());
 
     private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
+    private final Map<TopicPartition, OffsetAndMetadata> nextOffsets;
 
+    @Deprecated
     public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>> 
records) {
+        this(records, Collections.emptyMap());
+    }
+
+    public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>> 
records, final Map<TopicPartition, OffsetAndMetadata> nextOffsets) {
         this.records = records;
+        this.nextOffsets = Collections.unmodifiableMap(nextOffsets);

Review Comment:
   Now, you are doing `Collections.unmodifiableMap` twice. Once in the getter, 
once in the contructor. I would remove it here.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java:
##########
@@ -195,7 +196,7 @@ private Fetch<K, V> fetchRecords(final CompletedFetch 
nextInLineFetch, int maxRe
                     metricsManager.recordPartitionLead(tp, lead);
                 }
 
-                return Fetch.forPartition(tp, partRecords, positionAdvanced);
+                return Fetch.forPartition(tp, partRecords, positionAdvanced, 
new OffsetAndMetadata(nextInLineFetch.nextFetchOffset(), 
nextInLineFetch.lastEpoch(), ""));

Review Comment:
   Thanks for the update. 
   
   One more thing that I noticed - not 100% sure if it matters. In this code, 
even if the `position` does not advance (i.e., we haven't really fetched 
anything), we will still return the corresponding "nextOffset". I suppose 
(without checking further) that this would mean that `partRecords` is empty 
here.
   
   I'm not sure if we should include a TP that did not advance its position at 
all in `nextOffsets`. After all, we promise to return only those topic 
partitions that have fetched something?
   
   This is also why I pressed on a more precise comment above -- for which 
topic partitions do we precisely include a next offset, and for which do we not 
include one?
   
   Thoughts? @kirktrue @aliehsaeedii 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to