junrao commented on code in PR #19581:
URL: https://github.com/apache/kafka/pull/19581#discussion_r2075993932


##########
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java:
##########
@@ -1068,6 +1073,148 @@ public void testUnsupportedCompress() {
         });
     }
 
+    @ParameterizedTest
+    @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
+    public void testSlice(Args args) throws IOException {
+        // Create a MemoryRecords instance with multiple batches. Prior 
RecordBatch.MAGIC_VALUE_V2,
+        // every append in a batch is a new batch. After 
RecordBatch.MAGIC_VALUE_V2, we can have multiple
+        // batches in a single MemoryRecords instance. Though with 
compression, we can have multiple
+        // appends resulting in a single batch prior 
RecordBatch.MAGIC_VALUE_V2 as well.
+        LinkedHashMap<Long, Integer> recordsPerOffset = new LinkedHashMap<>();
+        recordsPerOffset.put(args.firstOffset, 3);
+        recordsPerOffset.put(args.firstOffset + 6L, 8);
+        recordsPerOffset.put(args.firstOffset + 15L, 4);
+        MemoryRecords records = createMemoryRecords(args, recordsPerOffset);
+
+        // Test slicing from start
+        Records sliced = records.slice(0, records.sizeInBytes());
+        assertEquals(records.sizeInBytes(), sliced.sizeInBytes());
+        assertEquals(records.validBytes(), ((MemoryRecords) 
sliced).validBytes());
+        TestUtils.checkEquals(records.batches(), ((MemoryRecords) 
sliced).batches());
+
+        List<RecordBatch> items = batches(records);
+        // Test slicing first message.
+        RecordBatch first = items.get(0);
+        sliced = records.slice(first.sizeInBytes(), records.sizeInBytes() - 
first.sizeInBytes());
+        assertEquals(records.sizeInBytes() - first.sizeInBytes(), 
sliced.sizeInBytes());
+        assertEquals(items.subList(1, items.size()), batches(sliced), "Read 
starting from the second message");
+        assertTrue(((MemoryRecords) sliced).validBytes() <= 
sliced.sizeInBytes());
+
+        // Read from second message and size is past the end of the file.
+        sliced = records.slice(first.sizeInBytes(), records.sizeInBytes());
+        assertEquals(records.sizeInBytes() - first.sizeInBytes(), 
sliced.sizeInBytes());
+        assertEquals(items.subList(1, items.size()), batches(sliced), "Read 
starting from the second message");
+        assertTrue(((MemoryRecords) sliced).validBytes() <= 
sliced.sizeInBytes());
+
+        // Read from second message and position + size overflows.
+        sliced = records.slice(first.sizeInBytes(), Integer.MAX_VALUE);
+        assertEquals(records.sizeInBytes() - first.sizeInBytes(), 
sliced.sizeInBytes());
+        assertEquals(items.subList(1, items.size()), batches(sliced), "Read 
starting from the second message");
+        assertTrue(((MemoryRecords) sliced).validBytes() <= 
sliced.sizeInBytes());
+
+        // Read a single message starting from second message.
+        RecordBatch second = items.get(1);
+        sliced = records.slice(first.sizeInBytes(), second.sizeInBytes());
+        assertEquals(second.sizeInBytes(), sliced.sizeInBytes());
+        assertEquals(Collections.singletonList(second), batches(sliced), "Read 
a single message starting from the second message");
+
+        // Read from already sliced view.
+        List<RecordBatch> remainingItems = IntStream.range(0, 
items.size()).filter(i -> i != 0 && i != 
1).mapToObj(items::get).collect(Collectors.toList());
+        int remainingSize = 
remainingItems.stream().mapToInt(RecordBatch::sizeInBytes).sum();
+        sliced = records.slice(first.sizeInBytes(), records.sizeInBytes() - 
first.sizeInBytes())
+                        .slice(second.sizeInBytes(), records.sizeInBytes() - 
first.sizeInBytes() - second.sizeInBytes());
+        assertEquals(remainingSize, sliced.sizeInBytes());
+        assertEquals(remainingItems, batches(sliced), "Read starting from the 
third message");
+
+        // Read from second message and size is past the end of the file on 
the already sliced view.
+        sliced = records.slice(1, records.sizeInBytes() - 1)
+            .slice(first.sizeInBytes() - 1, records.sizeInBytes());
+        assertEquals(records.sizeInBytes() - first.sizeInBytes(), 
sliced.sizeInBytes());
+        assertEquals(items.subList(1, items.size()), batches(sliced), "Read 
starting from the second message");
+        assertTrue(((MemoryRecords) sliced).validBytes() <= 
sliced.sizeInBytes());
+
+        // Read from second message and position + size overflows on the 
already sliced view.
+        sliced = records.slice(1, records.sizeInBytes() - 1)
+            .slice(first.sizeInBytes() - 1, Integer.MAX_VALUE);
+        assertEquals(records.sizeInBytes() - first.sizeInBytes(), 
sliced.sizeInBytes());
+        assertEquals(items.subList(1, items.size()), batches(sliced), "Read 
starting from the second message");
+        assertTrue(((MemoryRecords) sliced).validBytes() <= 
sliced.sizeInBytes());
+    }
+
+    @ParameterizedTest
+    @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
+    public void testSliceInvalidPosition(Args args) {
+        MemoryRecords records = createMemoryRecords(args, 
Map.of(args.firstOffset, 1));
+        assertThrows(IllegalArgumentException.class, () -> records.slice(-1, 
records.sizeInBytes()));
+        assertThrows(IllegalArgumentException.class, () -> 
records.slice(records.sizeInBytes() + 1, records.sizeInBytes()));
+    }
+
+    @ParameterizedTest
+    @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
+    public void testSliceInvalidSize(Args args) {
+        MemoryRecords records = createMemoryRecords(args, 
Map.of(args.firstOffset, 1));
+        assertThrows(IllegalArgumentException.class, () -> records.slice(0, 
-1));
+    }
+
+    @Test
+    public void testSliceEmptyRecords() {
+        MemoryRecords empty = MemoryRecords.EMPTY;
+        Records sliced = empty.slice(0, 0);
+        assertEquals(0, sliced.sizeInBytes());
+        assertEquals(0, batches(sliced).size());
+    }
+
+    /**
+     * Test slice when already sliced memory records have start position 
greater than available bytes
+     * in the memory records.
+     */
+    @ParameterizedTest
+    @ArgumentsSource(MemoryRecordsArgumentsProvider.class)
+    public void testSliceForAlreadySlicedMemoryRecords(Args args) throws 
IOException {
+        LinkedHashMap<Long, Integer> recordsPerOffset = new LinkedHashMap<>();
+        recordsPerOffset.put(args.firstOffset, 5);
+        recordsPerOffset.put(args.firstOffset + 5L, 10);
+        recordsPerOffset.put(args.firstOffset + 15L, 12);
+        recordsPerOffset.put(args.firstOffset + 27L, 4);
+
+        MemoryRecords records = createMemoryRecords(args, recordsPerOffset);
+        List<RecordBatch> items = batches(records.slice(0, 
records.sizeInBytes()));
+
+        // Slice from third message until the end.
+        int position = IntStream.range(0, 2).map(i -> 
items.get(i).sizeInBytes()).sum();
+        Records sliced  = records.slice(position, records.sizeInBytes() - 
position);
+        assertEquals(records.sizeInBytes() - position, sliced.sizeInBytes());
+        assertEquals(items.subList(2, items.size()), batches(sliced), "Read 
starting from the third message");
+
+        // Further slice the already sliced memory records, from fourth 
message until the end. Now the
+        // bytes available in the sliced records are less than the start 
position. However, the
+        // position to slice is relative hence reset position to second 
message in the sliced memory

Review Comment:
   > Now the bytes available in the sliced records are less than the start 
position.
   
   Hmm, which start position does it refer to? Is that important?



##########
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java:
##########
@@ -312,7 +312,7 @@ public ByteBuffer buffer() {
      * @return A sliced buffer on this message set limited based on the given 
position and size
      */
     @Override
-    public MemoryRecords slice(int position, int size) {
+    public Records slice(int position, int size) {

Review Comment:
   Actually, it seems that our existing code already returns a subclass when 
implementing the Records interface.
   For example, Records has 
   `    Iterable<? extends RecordBatch> batches()`
   
   and FileRecords has
   `.   public Iterable<FileChannelRecordBatch> batches() `
   
   Perhaps it's better to define the interface as 
   `    <R extends Records> R slice(int position, int size) throws IOException`
   and return `MemoryRecords` here. This way, we can avoid the type casting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to