dengziming commented on a change in pull request #2140:
URL: https://github.com/apache/kafka/pull/2140#discussion_r563471404



##########
File path: clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
##########
@@ -44,83 +53,435 @@ public FileRecords(File file,
         this.channel = channel;
         this.start = start;
         this.end = end;
+        this.isSlice = isSlice;
+        this.size = new AtomicInteger();
 
-        if (isSlice)
-            this.size = end - start;
-        else
-            this.size = Math.min(channel.size(), end) - start;
+        // set the initial size of the buffer
+        resize();
+    }
+
+    public void resize() throws IOException {
+        if (isSlice) {
+            size.set(end - start);
+        } else {
+            int limit = Math.min((int) channel.size(), end);
+            size.set(limit - start);
+
+            // if this is not a slice, update the file pointer to the end of 
the file
+            // set the file position to the last byte in the file
+            channel.position(limit);
+        }
     }
 
     @Override
     public int sizeInBytes() {
-        return (int) size;
+        return size.get();
+    }
+
+    /**
+     * Get the underlying file.
+     * @return The file
+     */
+    public File file() {
+        return file;
+    }
+
+    /**
+     * Get the underlying file channel.
+     * @return The file channel
+     */
+    public FileChannel channel() {
+        return channel;
+    }
+
+    /**
+     * Read log entries into a given buffer.
+     * @param buffer The buffer to write the entries to
+     * @param position Position in the buffer to read from
+     * @return The same buffer
+     * @throws IOException
+     */
+    public ByteBuffer readInto(ByteBuffer buffer, int position) throws 
IOException {
+        channel.read(buffer, position + this.start);
+        buffer.flip();
+        return buffer;
+    }
+
+    /**
+     * Return a slice of records from this instance, which is a view into this 
set starting from the given position
+     * and with the given size limit.
+     *
+     * If the size is beyond the end of the file, the end will be based on the 
size of the file at the time of the read.
+     *
+     * If this message set is already sliced, the position will be taken 
relative to that slicing.
+     *
+     * @param position The start position to begin the read from
+     * @param size The number of bytes after the start position to include
+     * @return A sliced wrapper on this message set limited based on the given 
position and size
+     */
+    public FileRecords read(int position, int size) throws IOException {
+        if (position < 0)
+            throw new IllegalArgumentException("Invalid position: " + 
position);
+        if (size < 0)
+            throw new IllegalArgumentException("Invalid size: " + size);
+
+        final int end;
+        if (this.start + position + size < 0)
+            end = sizeInBytes();
+        else
+            end = Math.min(this.start + position + size, sizeInBytes());
+        return new FileRecords(file, channel, this.start + position, end, 
true);
+    }
+
+    /**
+     * Append log entries to the buffer
+     * @param records The records to append
+     * @return the number of bytes written to the underlying file
+     */
+    public int append(MemoryRecords records) throws IOException {
+        int written = records.writeFullyTo(channel);
+        size.getAndAdd(written);
+        return written;
+    }
+
+    /**
+     * Commit all written data to the physical disk
+     */
+    public void flush() throws IOException {
+        channel.force(true);
+    }
+
+    /**
+     * Close this record set
+     */
+    public void close() throws IOException {
+        flush();
+        trim();
+        channel.close();
+    }
+
+    /**
+     * Delete this message set from the filesystem
+     * @return True iff this message set was deleted.
+     */
+    public boolean delete() {
+        Utils.closeQuietly(channel, "FileChannel");
+        return file.delete();
+    }
+
+    /**
+     * Trim file when close or roll to next file
+     */
+    public void trim() throws IOException {
+        truncateTo(sizeInBytes());
+    }
+
+    /**
+     * Update the file reference (to be used with caution since this does not 
reopen the file channel)
+     * @param file The new file to use
+     */
+    public void setFile(File file) {
+        this.file = file;
+    }
+
+    /**
+     * Rename the file that backs this message set
+     * @throws IOException if rename fails.
+     */
+    public void renameTo(File f) throws IOException {
+        try {
+            Utils.atomicMoveWithFallback(file.toPath(), f.toPath());
+        }  finally {
+            this.file = f;
+        }
+    }
+
+    /**
+     * Truncate this file message set to the given size in bytes. Note that 
this API does no checking that the
+     * given size falls on a valid message boundary.
+     * In some versions of the JDK truncating to the same size as the file 
message set will cause an
+     * update of the files mtime, so truncate is only performed if the 
targetSize is smaller than the
+     * size of the underlying FileChannel.
+     * It is expected that no other threads will do writes to the log when 
this function is called.
+     * @param targetSize The size to truncate to. Must be between 0 and 
sizeInBytes.
+     * @return The number of bytes truncated off
+     */
+    public int truncateTo(int targetSize) throws IOException {
+        int originalSize = sizeInBytes();
+        if (targetSize > originalSize || targetSize < 0)
+            throw new KafkaException("Attempt to truncate log segment to " + 
targetSize + " bytes failed, " +
+                    " size of this log segment is " + originalSize + " 
bytes.");
+        if (targetSize < (int) channel.size()) {
+            channel.truncate(targetSize);
+            channel.position(targetSize);
+            size.set(targetSize);
+        }
+        return originalSize - targetSize;
     }
 
     @Override
     public long writeTo(GatheringByteChannel destChannel, long offset, int 
length) throws IOException {
         long newSize = Math.min(channel.size(), end) - start;
-        if (newSize < size)
+        if (newSize < size.get())
             throw new KafkaException(String.format("Size of FileRecords %s has 
been truncated during write: old size %d, new size %d", file.getAbsolutePath(), 
size, newSize));
 
-        if (offset > size)
-            throw new KafkaException(String.format("The requested offset %d is 
out of range. The size of this FileRecords is %d.", offset, size));
-
         long position = start + offset;
-        long count = Math.min(length, this.size - offset);
+        long count = Math.min(length, size.get());

Review comment:
       Should this be `Math.min(length, size.get() - offset)`?

##########
File path: clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
##########
@@ -44,83 +53,435 @@ public FileRecords(File file,
         this.channel = channel;
         this.start = start;
         this.end = end;
+        this.isSlice = isSlice;
+        this.size = new AtomicInteger();
 
-        if (isSlice)
-            this.size = end - start;
-        else
-            this.size = Math.min(channel.size(), end) - start;
+        // set the initial size of the buffer
+        resize();
+    }
+
+    public void resize() throws IOException {
+        if (isSlice) {
+            size.set(end - start);
+        } else {
+            int limit = Math.min((int) channel.size(), end);
+            size.set(limit - start);
+
+            // if this is not a slice, update the file pointer to the end of 
the file
+            // set the file position to the last byte in the file
+            channel.position(limit);
+        }
     }
 
     @Override
     public int sizeInBytes() {
-        return (int) size;
+        return size.get();
+    }
+
+    /**
+     * Get the underlying file.
+     * @return The file
+     */
+    public File file() {
+        return file;
+    }
+
+    /**
+     * Get the underlying file channel.
+     * @return The file channel
+     */
+    public FileChannel channel() {
+        return channel;
+    }
+
+    /**
+     * Read log entries into a given buffer.
+     * @param buffer The buffer to write the entries to
+     * @param position Position in the buffer to read from
+     * @return The same buffer
+     * @throws IOException
+     */
+    public ByteBuffer readInto(ByteBuffer buffer, int position) throws 
IOException {
+        channel.read(buffer, position + this.start);
+        buffer.flip();
+        return buffer;
+    }
+
+    /**
+     * Return a slice of records from this instance, which is a view into this 
set starting from the given position
+     * and with the given size limit.
+     *
+     * If the size is beyond the end of the file, the end will be based on the 
size of the file at the time of the read.
+     *
+     * If this message set is already sliced, the position will be taken 
relative to that slicing.
+     *
+     * @param position The start position to begin the read from
+     * @param size The number of bytes after the start position to include
+     * @return A sliced wrapper on this message set limited based on the given 
position and size
+     */
+    public FileRecords read(int position, int size) throws IOException {
+        if (position < 0)
+            throw new IllegalArgumentException("Invalid position: " + 
position);
+        if (size < 0)
+            throw new IllegalArgumentException("Invalid size: " + size);
+
+        final int end;
+        if (this.start + position + size < 0)
+            end = sizeInBytes();
+        else
+            end = Math.min(this.start + position + size, sizeInBytes());
+        return new FileRecords(file, channel, this.start + position, end, 
true);
+    }
+
+    /**
+     * Append log entries to the buffer
+     * @param records The records to append
+     * @return the number of bytes written to the underlying file
+     */
+    public int append(MemoryRecords records) throws IOException {
+        int written = records.writeFullyTo(channel);
+        size.getAndAdd(written);
+        return written;
+    }
+
+    /**
+     * Commit all written data to the physical disk
+     */
+    public void flush() throws IOException {
+        channel.force(true);
+    }
+
+    /**
+     * Close this record set
+     */
+    public void close() throws IOException {
+        flush();
+        trim();
+        channel.close();
+    }
+
+    /**
+     * Delete this message set from the filesystem
+     * @return True iff this message set was deleted.
+     */
+    public boolean delete() {
+        Utils.closeQuietly(channel, "FileChannel");
+        return file.delete();
+    }
+
+    /**
+     * Trim file when close or roll to next file
+     */
+    public void trim() throws IOException {
+        truncateTo(sizeInBytes());
+    }
+
+    /**
+     * Update the file reference (to be used with caution since this does not 
reopen the file channel)
+     * @param file The new file to use
+     */
+    public void setFile(File file) {
+        this.file = file;
+    }
+
+    /**
+     * Rename the file that backs this message set
+     * @throws IOException if rename fails.
+     */
+    public void renameTo(File f) throws IOException {
+        try {
+            Utils.atomicMoveWithFallback(file.toPath(), f.toPath());
+        }  finally {
+            this.file = f;
+        }
+    }
+
+    /**
+     * Truncate this file message set to the given size in bytes. Note that 
this API does no checking that the
+     * given size falls on a valid message boundary.
+     * In some versions of the JDK truncating to the same size as the file 
message set will cause an
+     * update of the files mtime, so truncate is only performed if the 
targetSize is smaller than the
+     * size of the underlying FileChannel.
+     * It is expected that no other threads will do writes to the log when 
this function is called.
+     * @param targetSize The size to truncate to. Must be between 0 and 
sizeInBytes.
+     * @return The number of bytes truncated off
+     */
+    public int truncateTo(int targetSize) throws IOException {
+        int originalSize = sizeInBytes();
+        if (targetSize > originalSize || targetSize < 0)
+            throw new KafkaException("Attempt to truncate log segment to " + 
targetSize + " bytes failed, " +
+                    " size of this log segment is " + originalSize + " 
bytes.");
+        if (targetSize < (int) channel.size()) {
+            channel.truncate(targetSize);
+            channel.position(targetSize);
+            size.set(targetSize);
+        }
+        return originalSize - targetSize;
     }
 
     @Override
     public long writeTo(GatheringByteChannel destChannel, long offset, int 
length) throws IOException {
         long newSize = Math.min(channel.size(), end) - start;
-        if (newSize < size)
+        if (newSize < size.get())
             throw new KafkaException(String.format("Size of FileRecords %s has 
been truncated during write: old size %d, new size %d", file.getAbsolutePath(), 
size, newSize));
 
-        if (offset > size)
-            throw new KafkaException(String.format("The requested offset %d is 
out of range. The size of this FileRecords is %d.", offset, size));
-
         long position = start + offset;
-        long count = Math.min(length, this.size - offset);
+        long count = Math.min(length, size.get());

Review comment:
       https://github.com/apache/kafka/pull/9970




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to