ibessonov commented on code in PR #2390:
URL: https://github.com/apache/ignite-3/pull/2390#discussion_r1298177556
##########
modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/ChunkedFileWriter.java:
##########
@@ -17,99 +17,87 @@
package org.apache.ignite.internal.network.file;
+import java.io.BufferedOutputStream;
+import java.io.File;
import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.file.Path;
-import java.util.PriorityQueue;
-import java.util.Queue;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import
org.apache.ignite.internal.network.file.exception.FileValidationException;
import org.apache.ignite.internal.network.file.messages.FileChunkMessage;
/**
- * Chunked file writer. Writes chunks in order. If a chunk is not in order, it
is stored in a queue. When the next chunk is written, the
- * queues are checked for the next chunk. If the next chunk is found, it is
written to the file and removed from the queue. If the next
- * chunk is not found, the file is not written to. The writer is not
thread-safe.
+ * Chunked file writer. Writes chunks to a file. Checks that the file size and
chunk numbers are valid.
*/
class ChunkedFileWriter implements AutoCloseable {
- private static final int UNKNOWN_FILE_SIZE = -1;
+ private final BufferedOutputStream stream;
- private final RandomAccessFile raf;
+ private final long expectedFileLength;
- private long fileSize;
+ private long bytesWritten = 0;
- private final Queue<FileChunkMessage> chunks = new
PriorityQueue<>(FileChunkMessage.COMPARATOR);
-
- private ChunkedFileWriter(RandomAccessFile raf, long fileSize) {
- if (fileSize < UNKNOWN_FILE_SIZE) {
- throw new IllegalArgumentException("File size must be
non-negative");
- }
-
- this.raf = raf;
- this.fileSize = fileSize;
- }
+ private int expectedNextChunkNumber = 0;
/**
- * Opens a file with unknown size for writing.
- *
- * @param path File path.
- * @return Chunked file writer.
- * @throws FileNotFoundException If the file is not found.
+ * Lock to synchronize access to the file. We don't write to the file
simultaneously, but the next chunk number and bytes written can be
+ * updated from different threads.
*/
- static ChunkedFileWriter open(Path path) throws FileNotFoundException {
- return new ChunkedFileWriter(new RandomAccessFile(path.toFile(),
"rw"), UNKNOWN_FILE_SIZE);
+ private final Lock lock = new ReentrantLock();
+
+ private ChunkedFileWriter(BufferedOutputStream stream, long
expectedFileLength) {
+ this.stream = stream;
+ this.expectedFileLength = expectedFileLength;
}
/**
* Opens a file with known size for writing.
*
- * @param path File path.
- * @param fileSize File size. If the file size is unknown, pass {@link
#UNKNOWN_FILE_SIZE}.
+ * @param file File path.
+ * @param expectedFileLength Expected file size.
* @return Chunked file writer.
* @throws FileNotFoundException If the file is not found.
*/
- static ChunkedFileWriter open(Path path, long fileSize) throws
FileNotFoundException {
- return new ChunkedFileWriter(new RandomAccessFile(path.toFile(),
"rw"), fileSize);
+ static ChunkedFileWriter open(File file, long expectedFileLength) throws
IOException {
+ return new ChunkedFileWriter(new BufferedOutputStream(new
FileOutputStream(file)), expectedFileLength);
}
/**
- * Writes a chunk to the file.
+ * Writes a chunk to the file. The chunk number must be equal to the next
expected chunk number. Closes the file if the last chunk is
+ * written. Returns true if the last chunk is written.
*
- * @param chunk Chunk.
+ * @param chunk Chunk to write.
+ * @return True if the last chunk is written. False otherwise.
* @throws IOException If an I/O error occurs.
+ * @throws FileValidationException If the chunk number or file size is
invalid or expected file size is exceeded.
*/
- void write(FileChunkMessage chunk) throws IOException {
- chunks.add(chunk);
- while (!chunks.isEmpty() && chunks.peek().offset() ==
raf.getFilePointer()) {
- raf.write(chunks.poll().data());
+ boolean write(FileChunkMessage chunk) throws IOException {
+ if (chunk.number() != expectedNextChunkNumber) {
+ throw new FileValidationException("Chunk number mismatch: expected
" + expectedNextChunkNumber + ", actual " + chunk.number());
}
- if (fileSize != UNKNOWN_FILE_SIZE && raf.getFilePointer() > fileSize) {
- throw new FileValidationException("File size exceeded: expected "
+ fileSize + ", actual " + raf.getFilePointer());
- }
- }
+ lock.lock();
+ try {
+ stream.write(chunk.data());
Review Comment:
Ok, my bad, I mistaken this method for another, I apologize
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]