lucasbru commented on code in PR #17414:
URL: https://github.com/apache/kafka/pull/17414#discussion_r1796868931
##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java:
##########
@@ -53,6 +60,14 @@ public List<ConsumerRecord<K, V>> records(TopicPartition
partition) {
return Collections.unmodifiableList(recs);
}
+ /**
+ * Get the next offsets and metadata corresponding to all partitions
fetched from the last batch.
Review Comment:
Why `fetched from the last batch`? When I follow the code, it seems the
result can include the offsets from multiple batches?
@kirktrue made a fair point that other comments are also not good in this
class. But for this method specifically, I think it should be clear for which
topic-partitions we include offsets and for which topic-partitions we don't, as
this will define what offsets we commit, and committing tends to be important
in Kafka. So it should be very clear what is the set of offsets returned by
this method.
I think we can say that we include offsets for all topic-partitions for
which the position advanced in this poll call. But then, we need to change the
code a bit (below).
For me, this is not a minor point.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java:
##########
@@ -95,6 +102,13 @@ public int numRecords() {
return numRecords;
}
+ /**
+ * @return the next offsets and metadata (last epochs is included)
Review Comment:
nit: `last epoch` not `last epochs`
##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java:
##########
@@ -32,12 +32,19 @@
* partition returned by a {@link Consumer#poll(java.time.Duration)} operation.
*/
public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
- public static final ConsumerRecords<Object, Object> EMPTY = new
ConsumerRecords<>(Collections.emptyMap());
+ public static final ConsumerRecords<Object, Object> EMPTY = new
ConsumerRecords<>(Collections.emptyMap(), Collections.emptyMap());
private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
+ private final Map<TopicPartition, OffsetAndMetadata> nextOffsets;
+ @Deprecated
public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>>
records) {
+ this(records, Collections.emptyMap());
+ }
+
+ public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>>
records, final Map<TopicPartition, OffsetAndMetadata> nextOffsets) {
this.records = records;
+ this.nextOffsets = Collections.unmodifiableMap(nextOffsets);
Review Comment:
Now, you are doing `Collections.unmodifiableMap` twice. Once in the getter,
once in the contructor. I would remove it here.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java:
##########
@@ -195,7 +196,7 @@ private Fetch<K, V> fetchRecords(final CompletedFetch
nextInLineFetch, int maxRe
metricsManager.recordPartitionLead(tp, lead);
}
- return Fetch.forPartition(tp, partRecords, positionAdvanced);
+ return Fetch.forPartition(tp, partRecords, positionAdvanced,
new OffsetAndMetadata(nextInLineFetch.nextFetchOffset(),
nextInLineFetch.lastEpoch(), ""));
Review Comment:
Thanks for the update.
One more thing that I noticed - not 100% sure if it matters. In this code,
even if the `position` does not advance (i.e., we haven't really fetched
anything), we will still return the corresponding "nextOffset". I suppose
(without checking further) that this would mean that `partRecords` is empty
here.
I'm not sure if we should include a TP that did not advance its position at
all in `nextOffsets`. After all, we promise to return only those topic
partitions that have fetched something?
This is also why I pressed on a more precise comment above -- for which
topic partitions do we precisely include a next offset, and for which do we not
include one?
Thoughts? @kirktrue @aliehsaeedii
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]