mumrah commented on code in PR #19214:
URL: https://github.com/apache/kafka/pull/19214#discussion_r1998877164


##########
clients/src/main/java/org/apache/kafka/common/record/FileRecords.java:
##########
@@ -464,6 +481,12 @@ public static class LogOffsetPosition {
         public final int position;
         public final int size;
 
+        public LogOffsetPosition(FileChannelRecordBatch batch) {

Review Comment:
   Since this LogOffsetPosition is used like a value class, this might be 
better as an factory method like `LogOffsetPosition#fromBatch` instead of an 
alternate constructor



##########
clients/src/main/java/org/apache/kafka/common/record/FileRecords.java:
##########
@@ -298,11 +298,28 @@ public int writeTo(TransferableChannel destChannel, int 
offset, int length) thro
      * @return the batch's base offset, its physical position, and its size 
(including log overhead)
      */
     public LogOffsetPosition searchForOffsetWithSize(long targetOffset, int 
startingPosition) {
+        FileChannelRecordBatch prevBatch = null;
+
         for (FileChannelRecordBatch batch : batchesFrom(startingPosition)) {
-            long offset = batch.lastOffset();
-            if (offset >= targetOffset)
-                return new LogOffsetPosition(batch.baseOffset(), 
batch.position(), batch.sizeInBytes());
+            // This indicates that either the current batch or the previous 
batch 
+            // contains the target we are looking for.
+            if (batch.baseOffset() >= targetOffset) {
+                // Check if the previous batch contains the target
+                if (prevBatch != null && prevBatch.lastOffset() >= 
targetOffset)
+                    return new LogOffsetPosition(prevBatch);
+                else
+                    // If there's no previous batch or the previous batch 
doesn't contain the 
+                    // target, return the current batch
+                    return new LogOffsetPosition(batch);

Review Comment:
   nit: please add curly braces here



##########
clients/src/main/java/org/apache/kafka/common/record/FileRecords.java:
##########
@@ -298,11 +298,28 @@ public int writeTo(TransferableChannel destChannel, int 
offset, int length) thro
      * @return the batch's base offset, its physical position, and its size 
(including log overhead)
      */
     public LogOffsetPosition searchForOffsetWithSize(long targetOffset, int 
startingPosition) {
+        FileChannelRecordBatch prevBatch = null;
+
         for (FileChannelRecordBatch batch : batchesFrom(startingPosition)) {
-            long offset = batch.lastOffset();
-            if (offset >= targetOffset)
-                return new LogOffsetPosition(batch.baseOffset(), 
batch.position(), batch.sizeInBytes());
+            // This indicates that either the current batch or the previous 
batch 
+            // contains the target we are looking for.
+            if (batch.baseOffset() >= targetOffset) {

Review Comment:
   Why are we using `>=` here? This means we need to deal with "current batch" 
and "prev batch" in the same loop iteration. This makes the code a bit more 
difficult to understand. Would it be simpler to only use `>` and look at the 
"prev batch" ? 
   
   Maybe I'm missing something that makes this necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to