jolshan commented on a change in pull request #9590: URL: https://github.com/apache/kafka/pull/9590#discussion_r537799341
########## File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ########## @@ -198,7 +198,7 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR if (!retainedRecords.isEmpty()) { if (writeOriginalBatch) { batch.writeTo(bufferOutputStream); - filterResult.updateRetainedBatchMetadata(batch, retainedRecords.size(), false); + filterResult.updateRetainedBatchMetadata(batch, retainedRecords.get(0).offset(), retainedRecords.size(), false); Review comment: I see. I guess that the goal here was to give the actual offset of the first record. If we choose to give the batch offset instead, we may return an offset that is still not the actual first offset (but will be much closer than 0). If this behavior is acceptable we can go with this option, but it will not match the behavior of consumer.offsetsForTimes() with timestamp 0L, as mentioned in KAFKA-7556. ########## File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ########## @@ -386,6 +390,10 @@ public long maxOffset() { return maxOffset; } + public long minOffset() { Review comment: I've added some unresolved comments about min offsets above. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org