ibessonov commented on code in PR #2390:
URL: https://github.com/apache/ignite-3/pull/2390#discussion_r1295639315


##########
modules/file-transfer/src/integrationTest/java/org/apache/ignite/internal/network/file/ItFileTransferTest.java:
##########
@@ -488,10 +488,7 @@ void uploadFilesWhenDoNotHaveAccessToWrite() {
         );
 
         // Check that files transfer failed.
-        assertThat(
-                uploaded,
-                willThrow(FileTransferException.class, "Failed to upload 
files:")
-        );
+        assertThat(uploaded, willThrow(FileTransferException.class));

Review Comment:
   Why did you remove the error message check? Looked nice



##########
modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/ChunkedFileReader.java:
##########
@@ -51,63 +61,59 @@ private ChunkedFileReader(String fileName, RandomAccessFile 
raf, int chunkSize)
      * @param file File.
      * @param chunkSize Chunk size.
      * @return Chunked file reader.
-     * @throws IOException If an I/O error occurs.
+     * @throws FileNotFoundException If the file does not exist.
      */
-    static ChunkedFileReader open(File file, int chunkSize) throws IOException 
{
-        return new ChunkedFileReader(file.getName(), new 
RandomAccessFile(file, "r"), chunkSize);
+    static ChunkedFileReader open(File file, int chunkSize) throws 
FileNotFoundException {
+        return new ChunkedFileReader(file.length(), new 
BufferedInputStream(new FileInputStream(file)), chunkSize);
     }
 
     /**
      * Returns {@code false} if there are no more chunks to read. Otherwise, 
returns {@code true}.
      *
      * @return {@code false} if there are no more chunks to read. Otherwise, 
returns {@code true}.
      */
-    boolean hasNextChunk() throws IOException {
-        return raf.getFilePointer() != raf.length();
+    boolean hasNextChunk() {
+        return offset < length;
     }
 
     /**
-     * Reads the next chunk.
+     * Reads the next chunk. If there are no more chunks to read, throws an 
exception. If the last chunk is read successfully, closes the
+     * file.
      *
      * @return Chunk data.
+     * @throws IllegalStateException If there are no more chunks to read.
      * @throws IOException If an I/O error occurs.
      */
     byte[] readNextChunk() throws IOException {
         if (!hasNextChunk()) {
-            throw new IOException("No more chunks to read");
+            throw new IllegalStateException("No more chunks to read");
         }
 
-        int toRead = (int) Math.min(chunkSize, length() - offset());
+        int toRead = (int) Math.min(chunkSize, length - offset);
         byte[] data = new byte[toRead];
-        raf.read(data);
-        return data;
-    }
+        stream.read(data);
+        offset += toRead;
+        nextChunkNumber++;
 
-    /**
-     * Returns the file name.
-     *
-     * @return File name.
-     */
-    String fileName() {
-        return fileName;
-    }
+        if (offset == length) {
+            stream.close();
+        }
 
-    /**
-     * Returns the current chunk offset.
-     *
-     * @return Chunk size.
-     */
-    long offset() throws IOException {
-        return raf.getFilePointer();
+        return data;
     }
 
     /**
-     * Returns the file length.
+     * Returns the number of the next chunk to read.
      *
-     * @return File length.
+     * @return The number of the next chunk to read.
+     * @throws IllegalStateException If there are no more chunks to read.
      */
-    long length() throws IOException {
-        return raf.length();
+    int nextChunkNumber() {
+        if (!hasNextChunk()) {
+            throw new IllegalStateException("No more chunks to read");

Review Comment:
   Same here, this is one of the most widely misused exceptions out there.
   Why do we even need the exception in this method in the first place?



##########
modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/FileTransferServiceImpl.java:
##########
@@ -242,87 +249,104 @@ private void processUploadRequest(FileUploadRequest 
message, String senderConsis
         CompletableFuture<List<Path>> uploadedFiles = 
directoryFuture.thenCompose(directory -> fileReceiver.registerTransfer(
                 senderConsistentId,
                 transferId,
+                message.headers(),
                 directory
         )).whenComplete((v, e) -> {
             if (e != null) {
                 LOG.error("Failed to register file transfer. Transfer ID: {}. 
Metadata: {}", e, transferId, identifier);
             }
         });
 
-        runAsync(() -> fileReceiver.receiveFileHeaders(transferId, 
message.headers()), executorService)
-                .handle((v, e) -> {
-                    FileUploadResponse response = 
messageFactory.fileUploadResponse()
-                            .transferId(transferId)
-                            .error(e != null ? buildError(new 
FileTransferException("Failed to receive file headers", e)) : null)
-                            .build();
+        FileTransferInitResponse response = 
messageFactory.fileTransferInitResponse().build();
 
-                    return messagingService.respond(senderConsistentId, 
FILE_TRANSFER_CHANNEL, response, correlationId);
-                })
-                .thenCompose(Function.identity())
+        messagingService.respond(senderConsistentId, FILE_TRANSFER_CHANNEL, 
response, correlationId)
                 .whenComplete((v, e) -> {
                     if (e != null) {
-                        LOG.error("Failed to send file upload response. 
Transfer ID: {}. Metadata: {}", e, transferId, identifier);
+                        LOG.error("Failed to send file transfer response. 
Transfer ID: {}. Metadata: {}", e, transferId, identifier);
                         fileReceiver.cancelTransfer(transferId, e);
                     }
                 })
-                .thenCompose(ignored -> uploadedFiles)
-                .thenCompose(files -> 
getFileConsumer(identifier).consume(identifier, files))
-                .whenComplete((v, e) -> {
+                .thenComposeAsync(ignored -> uploadedFiles, executorService)
+                .handleAsync((files, e) -> {
+                    if (e != null && 
transferIdToConsumer.containsKey(transferId)) {
+                        transferIdToConsumer.get(transferId).onError(e);
+                        return failedFuture(e);
+                    } else {
+                        FileConsumer<Identifier> consumer = 
transferIdToConsumer.containsKey(transferId)
+                                ? transferIdToConsumer.get(transferId)
+                                : getFileConsumer(identifier);
+                        return consumer.consume(identifier, files);
+                    }
+                }, executorService)
+                .thenComposeAsync(it -> it, executorService)

Review Comment:
   You could use `identity()`



##########
modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/FileTransferServiceImpl.java:
##########
@@ -242,87 +249,104 @@ private void processUploadRequest(FileUploadRequest 
message, String senderConsis
         CompletableFuture<List<Path>> uploadedFiles = 
directoryFuture.thenCompose(directory -> fileReceiver.registerTransfer(
                 senderConsistentId,
                 transferId,
+                message.headers(),
                 directory
         )).whenComplete((v, e) -> {
             if (e != null) {
                 LOG.error("Failed to register file transfer. Transfer ID: {}. 
Metadata: {}", e, transferId, identifier);
             }
         });
 
-        runAsync(() -> fileReceiver.receiveFileHeaders(transferId, 
message.headers()), executorService)
-                .handle((v, e) -> {
-                    FileUploadResponse response = 
messageFactory.fileUploadResponse()
-                            .transferId(transferId)
-                            .error(e != null ? buildError(new 
FileTransferException("Failed to receive file headers", e)) : null)
-                            .build();
+        FileTransferInitResponse response = 
messageFactory.fileTransferInitResponse().build();
 
-                    return messagingService.respond(senderConsistentId, 
FILE_TRANSFER_CHANNEL, response, correlationId);
-                })
-                .thenCompose(Function.identity())
+        messagingService.respond(senderConsistentId, FILE_TRANSFER_CHANNEL, 
response, correlationId)
                 .whenComplete((v, e) -> {
                     if (e != null) {
-                        LOG.error("Failed to send file upload response. 
Transfer ID: {}. Metadata: {}", e, transferId, identifier);
+                        LOG.error("Failed to send file transfer response. 
Transfer ID: {}. Metadata: {}", e, transferId, identifier);
                         fileReceiver.cancelTransfer(transferId, e);
                     }
                 })
-                .thenCompose(ignored -> uploadedFiles)
-                .thenCompose(files -> 
getFileConsumer(identifier).consume(identifier, files))
-                .whenComplete((v, e) -> {
+                .thenComposeAsync(ignored -> uploadedFiles, executorService)
+                .handleAsync((files, e) -> {
+                    if (e != null && 
transferIdToConsumer.containsKey(transferId)) {

Review Comment:
   Please add some comments. All these procedures are hard to understand.



##########
modules/file-transfer/src/test/java/org/apache/ignite/internal/network/file/FileSenderTest.java:
##########
@@ -106,25 +116,61 @@ void sendMultipleFiles() {
     }
 
     @Test
-    void exceptionIsThrownIfFileTransferFailed() {
+    void exceptionIsThrownWhenInvokeReturnException() {
         // Setup messaging service to fail on second file transfer.
         AtomicInteger count = new AtomicInteger();
-        given(messagingService.send(anyString(), any(), 
any())).will(invocation -> {
+        doAnswer(invocation -> {
             if (count.incrementAndGet() == 2) {
                 return failedFuture(new RuntimeException("Test exception"));
             } else {
-                return completedFuture(null);
+                return 
completedFuture(messageFactory.fileChunkResponse().build());
             }
+        })
+                .when(messagingService)
+                .invoke(anyString(), eq(Channel.FILE_TRANSFER_CHANNEL), 
any(FileChunkMessage.class), eq(RESPONSE_TIMEOUT));
 
-        });
+        // When.
+        Path randomFile = FileGenerator.randomFile(workDir, CHUNK_SIZE * 5);
+        UUID transferId = UUID.randomUUID();
+        FileSender sender = new FileSender(
+                CHUNK_SIZE,
+                new Semaphore(4),
+                RESPONSE_TIMEOUT, messagingService,
+                Executors.newSingleThreadExecutor()

Review Comment:
   Where's the code that stops these executors?



##########
modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/FileTransferServiceImpl.java:
##########
@@ -242,87 +249,104 @@ private void processUploadRequest(FileUploadRequest 
message, String senderConsis
         CompletableFuture<List<Path>> uploadedFiles = 
directoryFuture.thenCompose(directory -> fileReceiver.registerTransfer(
                 senderConsistentId,
                 transferId,
+                message.headers(),
                 directory
         )).whenComplete((v, e) -> {
             if (e != null) {
                 LOG.error("Failed to register file transfer. Transfer ID: {}. 
Metadata: {}", e, transferId, identifier);
             }
         });
 
-        runAsync(() -> fileReceiver.receiveFileHeaders(transferId, 
message.headers()), executorService)
-                .handle((v, e) -> {
-                    FileUploadResponse response = 
messageFactory.fileUploadResponse()
-                            .transferId(transferId)
-                            .error(e != null ? buildError(new 
FileTransferException("Failed to receive file headers", e)) : null)
-                            .build();
+        FileTransferInitResponse response = 
messageFactory.fileTransferInitResponse().build();
 
-                    return messagingService.respond(senderConsistentId, 
FILE_TRANSFER_CHANNEL, response, correlationId);
-                })
-                .thenCompose(Function.identity())
+        messagingService.respond(senderConsistentId, FILE_TRANSFER_CHANNEL, 
response, correlationId)
                 .whenComplete((v, e) -> {
                     if (e != null) {
-                        LOG.error("Failed to send file upload response. 
Transfer ID: {}. Metadata: {}", e, transferId, identifier);
+                        LOG.error("Failed to send file transfer response. 
Transfer ID: {}. Metadata: {}", e, transferId, identifier);
                         fileReceiver.cancelTransfer(transferId, e);
                     }
                 })
-                .thenCompose(ignored -> uploadedFiles)
-                .thenCompose(files -> 
getFileConsumer(identifier).consume(identifier, files))
-                .whenComplete((v, e) -> {
+                .thenComposeAsync(ignored -> uploadedFiles, executorService)
+                .handleAsync((files, e) -> {
+                    if (e != null && 
transferIdToConsumer.containsKey(transferId)) {
+                        transferIdToConsumer.get(transferId).onError(e);
+                        return failedFuture(e);
+                    } else {
+                        FileConsumer<Identifier> consumer = 
transferIdToConsumer.containsKey(transferId)
+                                ? transferIdToConsumer.get(transferId)
+                                : getFileConsumer(identifier);
+                        return consumer.consume(identifier, files);
+                    }
+                }, executorService)
+                .thenComposeAsync(it -> it, executorService)
+                .whenCompleteAsync((v, e) -> {
                     if (e != null) {
                         LOG.error(
-                                "Failed to handle file upload. Transfer ID: 
{}. Metadata: {}",
+                                "Failed to handle file transfer. Transfer ID: 
{}. Metadata: {}",
                                 e,
                                 transferId,
                                 identifier
                         );
                     }
 
                     directoryFuture.thenAccept(IgniteUtils::deleteIfExists);
-                });
+                }, executorService);
     }
 
     private void processDownloadRequest(FileDownloadRequest message, String 
senderConsistentId, Long correlationId) {
         supplyAsync(() -> getFileProvider(message.identifier()), 
executorService)
                 .thenCompose(provider -> provider.files(message.identifier()))
                 .whenComplete((files, e) -> {
                     if (e != null) {
-                        LOG.error("Failed to get files for download. Metadata: 
{}", message.identifier(), e);
+                        LOG.error("Failed to get files for download. Metadata: 
{}", e, message.identifier());
 
                         FileDownloadResponse response = 
messageFactory.fileDownloadResponse()
-                                .error(buildError(e))
+                                .error(fromThrowable(messageFactory, e))
                                 .build();
 
                         messagingService.respond(senderConsistentId, 
FILE_TRANSFER_CHANNEL, response, correlationId);
                     } else if (files.isEmpty()) {
                         LOG.warn("No files to download. Metadata: {}", 
message.identifier());
 
                         FileDownloadResponse response = 
messageFactory.fileDownloadResponse()
-                                .error(buildError(new 
FileTransferException("No files to download")))
+                                .error(fromThrowable(messageFactory, new 
FileTransferException("No files to download")))
                                 .build();
 
                         messagingService.respond(senderConsistentId, 
FILE_TRANSFER_CHANNEL, response, correlationId);
                     } else {
                         FileDownloadResponse response = 
messageFactory.fileDownloadResponse()
-                                .headers(fromPaths(messageFactory, files))
                                 .build();
 
                         messagingService.respond(senderConsistentId, 
FILE_TRANSFER_CHANNEL, response, correlationId)
-                                .thenComposeAsync(v -> 
sendFiles(senderConsistentId, message.transferId(), files), executorService);
+                                .thenComposeAsync(v -> {
+                                    return 
transferFilesToNode(senderConsistentId, message.transferId(), 
message.identifier(), files);
+                                }, executorService);
                     }
                 });
     }
 
-    private void processFileChunkMessage(FileChunkMessage message) {
-        runAsync(() -> fileReceiver.receiveFileChunk(message), 
executorService);
+    private void processFileChunkMessage(FileChunkMessage message, String 
senderConsistentId, long correlationId) {
+        runAsync(() -> fileReceiver.receiveFileChunk(message), executorService)
+                .whenComplete((v, e) -> {
+                    if (e != null) {
+                        LOG.error("Failed to receive file chunk. Transfer ID: 
{}", e, message.transferId());

Review Comment:
   By the way, I don't think that this is how we should format our log 
messages, usually we have all the extra data in brackets, not as a separate 
sentence. This comment applied to a lot of your log messages



##########
modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/ChunkedFileWriter.java:
##########
@@ -17,99 +17,87 @@
 
 package org.apache.ignite.internal.network.file;
 
+import java.io.BufferedOutputStream;
+import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.file.Path;
-import java.util.PriorityQueue;
-import java.util.Queue;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import 
org.apache.ignite.internal.network.file.exception.FileValidationException;
 import org.apache.ignite.internal.network.file.messages.FileChunkMessage;
 
 /**
- * Chunked file writer. Writes chunks in order. If a chunk is not in order, it 
is stored in a queue. When the next chunk is written, the
- * queues are checked for the next chunk. If the next chunk is found, it is 
written to the file and removed from the queue. If the next
- * chunk is not found, the file is not written to. The writer is not 
thread-safe.
+ * Chunked file writer. Writes chunks to a file. Checks that the file size and 
chunk numbers are valid.
  */
 class ChunkedFileWriter implements AutoCloseable {
-    private static final int UNKNOWN_FILE_SIZE = -1;
+    private final BufferedOutputStream stream;
 
-    private final RandomAccessFile raf;
+    private final long expectedFileLength;
 
-    private long fileSize;
+    private long bytesWritten = 0;
 
-    private final Queue<FileChunkMessage> chunks = new 
PriorityQueue<>(FileChunkMessage.COMPARATOR);
-
-    private ChunkedFileWriter(RandomAccessFile raf, long fileSize) {
-        if (fileSize < UNKNOWN_FILE_SIZE) {
-            throw new IllegalArgumentException("File size must be 
non-negative");
-        }
-
-        this.raf = raf;
-        this.fileSize = fileSize;
-    }
+    private int expectedNextChunkNumber = 0;
 
     /**
-     * Opens a file with unknown size for writing.
-     *
-     * @param path File path.
-     * @return Chunked file writer.
-     * @throws FileNotFoundException If the file is not found.
+     * Lock to synchronize access to the file. We don't write to the file 
simultaneously, but the next chunk number and bytes written can be
+     * updated from different threads.
      */
-    static ChunkedFileWriter open(Path path) throws FileNotFoundException {
-        return new ChunkedFileWriter(new RandomAccessFile(path.toFile(), 
"rw"), UNKNOWN_FILE_SIZE);
+    private final Lock lock = new ReentrantLock();
+
+    private ChunkedFileWriter(BufferedOutputStream stream, long 
expectedFileLength) {
+        this.stream = stream;
+        this.expectedFileLength = expectedFileLength;
     }
 
     /**
      * Opens a file with known size for writing.
      *
-     * @param path File path.
-     * @param fileSize File size. If the file size is unknown, pass {@link 
#UNKNOWN_FILE_SIZE}.
+     * @param file File path.
+     * @param expectedFileLength Expected file size.
      * @return Chunked file writer.
      * @throws FileNotFoundException If the file is not found.
      */
-    static ChunkedFileWriter open(Path path, long fileSize) throws 
FileNotFoundException {
-        return new ChunkedFileWriter(new RandomAccessFile(path.toFile(), 
"rw"), fileSize);
+    static ChunkedFileWriter open(File file, long expectedFileLength) throws 
IOException {
+        return new ChunkedFileWriter(new BufferedOutputStream(new 
FileOutputStream(file)), expectedFileLength);
     }
 
     /**
-     * Writes a chunk to the file.
+     * Writes a chunk to the file. The chunk number must be equal to the next 
expected chunk number. Closes the file if the last chunk is
+     * written. Returns true if the last chunk is written.
      *
-     * @param chunk Chunk.
+     * @param chunk Chunk to write.
+     * @return True if the last chunk is written. False otherwise.
      * @throws IOException If an I/O error occurs.
+     * @throws FileValidationException If the chunk number or file size is 
invalid or expected file size is exceeded.
      */
-    void write(FileChunkMessage chunk) throws IOException {
-        chunks.add(chunk);
-        while (!chunks.isEmpty() && chunks.peek().offset() == 
raf.getFilePointer()) {
-            raf.write(chunks.poll().data());
+    boolean write(FileChunkMessage chunk) throws IOException {
+        if (chunk.number() != expectedNextChunkNumber) {
+            throw new FileValidationException("Chunk number mismatch: expected 
" + expectedNextChunkNumber + ", actual " + chunk.number());
         }
 
-        if (fileSize != UNKNOWN_FILE_SIZE && raf.getFilePointer() > fileSize) {
-            throw new FileValidationException("File size exceeded: expected " 
+ fileSize + ", actual " + raf.getFilePointer());
-        }
-    }
+        lock.lock();
+        try {
+            stream.write(chunk.data());

Review Comment:
   Same problem as with "read". As far as I know, there's no guarantee that all 
bytes will be written



##########
modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/ChunkedFileReader.java:
##########
@@ -51,63 +61,59 @@ private ChunkedFileReader(String fileName, RandomAccessFile 
raf, int chunkSize)
      * @param file File.
      * @param chunkSize Chunk size.
      * @return Chunked file reader.
-     * @throws IOException If an I/O error occurs.
+     * @throws FileNotFoundException If the file does not exist.
      */
-    static ChunkedFileReader open(File file, int chunkSize) throws IOException 
{
-        return new ChunkedFileReader(file.getName(), new 
RandomAccessFile(file, "r"), chunkSize);
+    static ChunkedFileReader open(File file, int chunkSize) throws 
FileNotFoundException {
+        return new ChunkedFileReader(file.length(), new 
BufferedInputStream(new FileInputStream(file)), chunkSize);
     }
 
     /**
      * Returns {@code false} if there are no more chunks to read. Otherwise, 
returns {@code true}.
      *
      * @return {@code false} if there are no more chunks to read. Otherwise, 
returns {@code true}.
      */
-    boolean hasNextChunk() throws IOException {
-        return raf.getFilePointer() != raf.length();
+    boolean hasNextChunk() {
+        return offset < length;
     }
 
     /**
-     * Reads the next chunk.
+     * Reads the next chunk. If there are no more chunks to read, throws an 
exception. If the last chunk is read successfully, closes the
+     * file.
      *
      * @return Chunk data.
+     * @throws IllegalStateException If there are no more chunks to read.
      * @throws IOException If an I/O error occurs.
      */
     byte[] readNextChunk() throws IOException {
         if (!hasNextChunk()) {
-            throw new IOException("No more chunks to read");
+            throw new IllegalStateException("No more chunks to read");

Review Comment:
   My advice is not to use IllegalStateException. It doesn't help, IOException 
was fine.



##########
modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/FileTransferServiceImpl.java:
##########
@@ -242,87 +249,104 @@ private void processUploadRequest(FileUploadRequest 
message, String senderConsis
         CompletableFuture<List<Path>> uploadedFiles = 
directoryFuture.thenCompose(directory -> fileReceiver.registerTransfer(
                 senderConsistentId,
                 transferId,
+                message.headers(),
                 directory
         )).whenComplete((v, e) -> {
             if (e != null) {
                 LOG.error("Failed to register file transfer. Transfer ID: {}. 
Metadata: {}", e, transferId, identifier);
             }
         });
 
-        runAsync(() -> fileReceiver.receiveFileHeaders(transferId, 
message.headers()), executorService)
-                .handle((v, e) -> {
-                    FileUploadResponse response = 
messageFactory.fileUploadResponse()
-                            .transferId(transferId)
-                            .error(e != null ? buildError(new 
FileTransferException("Failed to receive file headers", e)) : null)
-                            .build();
+        FileTransferInitResponse response = 
messageFactory.fileTransferInitResponse().build();
 
-                    return messagingService.respond(senderConsistentId, 
FILE_TRANSFER_CHANNEL, response, correlationId);
-                })
-                .thenCompose(Function.identity())
+        messagingService.respond(senderConsistentId, FILE_TRANSFER_CHANNEL, 
response, correlationId)
                 .whenComplete((v, e) -> {
                     if (e != null) {
-                        LOG.error("Failed to send file upload response. 
Transfer ID: {}. Metadata: {}", e, transferId, identifier);
+                        LOG.error("Failed to send file transfer response. 
Transfer ID: {}. Metadata: {}", e, transferId, identifier);
                         fileReceiver.cancelTransfer(transferId, e);
                     }
                 })
-                .thenCompose(ignored -> uploadedFiles)
-                .thenCompose(files -> 
getFileConsumer(identifier).consume(identifier, files))
-                .whenComplete((v, e) -> {
+                .thenComposeAsync(ignored -> uploadedFiles, executorService)
+                .handleAsync((files, e) -> {
+                    if (e != null && 
transferIdToConsumer.containsKey(transferId)) {
+                        transferIdToConsumer.get(transferId).onError(e);
+                        return failedFuture(e);
+                    } else {
+                        FileConsumer<Identifier> consumer = 
transferIdToConsumer.containsKey(transferId)
+                                ? transferIdToConsumer.get(transferId)
+                                : getFileConsumer(identifier);
+                        return consumer.consume(identifier, files);
+                    }
+                }, executorService)
+                .thenComposeAsync(it -> it, executorService)
+                .whenCompleteAsync((v, e) -> {
                     if (e != null) {
                         LOG.error(
-                                "Failed to handle file upload. Transfer ID: 
{}. Metadata: {}",
+                                "Failed to handle file transfer. Transfer ID: 
{}. Metadata: {}",
                                 e,
                                 transferId,
                                 identifier
                         );
                     }
 
                     directoryFuture.thenAccept(IgniteUtils::deleteIfExists);
-                });
+                }, executorService);
     }
 
     private void processDownloadRequest(FileDownloadRequest message, String 
senderConsistentId, Long correlationId) {
         supplyAsync(() -> getFileProvider(message.identifier()), 
executorService)
                 .thenCompose(provider -> provider.files(message.identifier()))
                 .whenComplete((files, e) -> {
                     if (e != null) {
-                        LOG.error("Failed to get files for download. Metadata: 
{}", message.identifier(), e);
+                        LOG.error("Failed to get files for download. Metadata: 
{}", e, message.identifier());
 
                         FileDownloadResponse response = 
messageFactory.fileDownloadResponse()
-                                .error(buildError(e))
+                                .error(fromThrowable(messageFactory, e))
                                 .build();
 
                         messagingService.respond(senderConsistentId, 
FILE_TRANSFER_CHANNEL, response, correlationId);
                     } else if (files.isEmpty()) {
                         LOG.warn("No files to download. Metadata: {}", 
message.identifier());
 
                         FileDownloadResponse response = 
messageFactory.fileDownloadResponse()
-                                .error(buildError(new 
FileTransferException("No files to download")))
+                                .error(fromThrowable(messageFactory, new 
FileTransferException("No files to download")))
                                 .build();
 
                         messagingService.respond(senderConsistentId, 
FILE_TRANSFER_CHANNEL, response, correlationId);
                     } else {
                         FileDownloadResponse response = 
messageFactory.fileDownloadResponse()
-                                .headers(fromPaths(messageFactory, files))
                                 .build();
 
                         messagingService.respond(senderConsistentId, 
FILE_TRANSFER_CHANNEL, response, correlationId)
-                                .thenComposeAsync(v -> 
sendFiles(senderConsistentId, message.transferId(), files), executorService);
+                                .thenComposeAsync(v -> {
+                                    return 
transferFilesToNode(senderConsistentId, message.transferId(), 
message.identifier(), files);
+                                }, executorService);
                     }
                 });
     }
 
-    private void processFileChunkMessage(FileChunkMessage message) {
-        runAsync(() -> fileReceiver.receiveFileChunk(message), 
executorService);
+    private void processFileChunkMessage(FileChunkMessage message, String 
senderConsistentId, long correlationId) {
+        runAsync(() -> fileReceiver.receiveFileChunk(message), executorService)
+                .whenComplete((v, e) -> {
+                    if (e != null) {
+                        LOG.error("Failed to receive file chunk. Transfer ID: 
{}", e, message.transferId());

Review Comment:
   What does it mean, "failed to receive"? You have received it, but wasn't 
able to process the data. It's not the same



##########
modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/ChunkedFileReader.java:
##########
@@ -51,63 +61,59 @@ private ChunkedFileReader(String fileName, RandomAccessFile 
raf, int chunkSize)
      * @param file File.
      * @param chunkSize Chunk size.
      * @return Chunked file reader.
-     * @throws IOException If an I/O error occurs.
+     * @throws FileNotFoundException If the file does not exist.
      */
-    static ChunkedFileReader open(File file, int chunkSize) throws IOException 
{
-        return new ChunkedFileReader(file.getName(), new 
RandomAccessFile(file, "r"), chunkSize);
+    static ChunkedFileReader open(File file, int chunkSize) throws 
FileNotFoundException {
+        return new ChunkedFileReader(file.length(), new 
BufferedInputStream(new FileInputStream(file)), chunkSize);
     }
 
     /**
      * Returns {@code false} if there are no more chunks to read. Otherwise, 
returns {@code true}.
      *
      * @return {@code false} if there are no more chunks to read. Otherwise, 
returns {@code true}.
      */
-    boolean hasNextChunk() throws IOException {
-        return raf.getFilePointer() != raf.length();
+    boolean hasNextChunk() {
+        return offset < length;
     }
 
     /**
-     * Reads the next chunk.
+     * Reads the next chunk. If there are no more chunks to read, throws an 
exception. If the last chunk is read successfully, closes the
+     * file.
      *
      * @return Chunk data.
+     * @throws IllegalStateException If there are no more chunks to read.
      * @throws IOException If an I/O error occurs.
      */
     byte[] readNextChunk() throws IOException {
         if (!hasNextChunk()) {
-            throw new IOException("No more chunks to read");
+            throw new IllegalStateException("No more chunks to read");
         }
 
-        int toRead = (int) Math.min(chunkSize, length() - offset());
+        int toRead = (int) Math.min(chunkSize, length - offset);
         byte[] data = new byte[toRead];
-        raf.read(data);
-        return data;
-    }
+        stream.read(data);

Review Comment:
   Does this call guarantees that if fills the entire array? I don't think so, 
it should return the real number of bytes that it had read. Too bad I didn't 
notice it earlier. Always be careful with read/write API, it's deceivingly 
simple.



##########
modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/ChunkedFileWriter.java:
##########
@@ -17,99 +17,87 @@
 
 package org.apache.ignite.internal.network.file;
 
+import java.io.BufferedOutputStream;
+import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.file.Path;
-import java.util.PriorityQueue;
-import java.util.Queue;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import 
org.apache.ignite.internal.network.file.exception.FileValidationException;
 import org.apache.ignite.internal.network.file.messages.FileChunkMessage;
 
 /**
- * Chunked file writer. Writes chunks in order. If a chunk is not in order, it 
is stored in a queue. When the next chunk is written, the
- * queues are checked for the next chunk. If the next chunk is found, it is 
written to the file and removed from the queue. If the next
- * chunk is not found, the file is not written to. The writer is not 
thread-safe.
+ * Chunked file writer. Writes chunks to a file. Checks that the file size and 
chunk numbers are valid.
  */
 class ChunkedFileWriter implements AutoCloseable {
-    private static final int UNKNOWN_FILE_SIZE = -1;
+    private final BufferedOutputStream stream;
 
-    private final RandomAccessFile raf;
+    private final long expectedFileLength;
 
-    private long fileSize;
+    private long bytesWritten = 0;
 
-    private final Queue<FileChunkMessage> chunks = new 
PriorityQueue<>(FileChunkMessage.COMPARATOR);
-
-    private ChunkedFileWriter(RandomAccessFile raf, long fileSize) {
-        if (fileSize < UNKNOWN_FILE_SIZE) {
-            throw new IllegalArgumentException("File size must be 
non-negative");
-        }
-
-        this.raf = raf;
-        this.fileSize = fileSize;
-    }
+    private int expectedNextChunkNumber = 0;
 
     /**
-     * Opens a file with unknown size for writing.
-     *
-     * @param path File path.
-     * @return Chunked file writer.
-     * @throws FileNotFoundException If the file is not found.
+     * Lock to synchronize access to the file. We don't write to the file 
simultaneously, but the next chunk number and bytes written can be
+     * updated from different threads.
      */
-    static ChunkedFileWriter open(Path path) throws FileNotFoundException {
-        return new ChunkedFileWriter(new RandomAccessFile(path.toFile(), 
"rw"), UNKNOWN_FILE_SIZE);
+    private final Lock lock = new ReentrantLock();
+
+    private ChunkedFileWriter(BufferedOutputStream stream, long 
expectedFileLength) {
+        this.stream = stream;
+        this.expectedFileLength = expectedFileLength;
     }
 
     /**
      * Opens a file with known size for writing.
      *
-     * @param path File path.
-     * @param fileSize File size. If the file size is unknown, pass {@link 
#UNKNOWN_FILE_SIZE}.
+     * @param file File path.
+     * @param expectedFileLength Expected file size.
      * @return Chunked file writer.
      * @throws FileNotFoundException If the file is not found.
      */
-    static ChunkedFileWriter open(Path path, long fileSize) throws 
FileNotFoundException {
-        return new ChunkedFileWriter(new RandomAccessFile(path.toFile(), 
"rw"), fileSize);
+    static ChunkedFileWriter open(File file, long expectedFileLength) throws 
IOException {
+        return new ChunkedFileWriter(new BufferedOutputStream(new 
FileOutputStream(file)), expectedFileLength);
     }
 
     /**
-     * Writes a chunk to the file.
+     * Writes a chunk to the file. The chunk number must be equal to the next 
expected chunk number. Closes the file if the last chunk is
+     * written. Returns true if the last chunk is written.
      *
-     * @param chunk Chunk.
+     * @param chunk Chunk to write.
+     * @return True if the last chunk is written. False otherwise.
      * @throws IOException If an I/O error occurs.
+     * @throws FileValidationException If the chunk number or file size is 
invalid or expected file size is exceeded.
      */
-    void write(FileChunkMessage chunk) throws IOException {
-        chunks.add(chunk);
-        while (!chunks.isEmpty() && chunks.peek().offset() == 
raf.getFilePointer()) {
-            raf.write(chunks.poll().data());
+    boolean write(FileChunkMessage chunk) throws IOException {
+        if (chunk.number() != expectedNextChunkNumber) {
+            throw new FileValidationException("Chunk number mismatch: expected 
" + expectedNextChunkNumber + ", actual " + chunk.number());
         }
 
-        if (fileSize != UNKNOWN_FILE_SIZE && raf.getFilePointer() > fileSize) {
-            throw new FileValidationException("File size exceeded: expected " 
+ fileSize + ", actual " + raf.getFilePointer());
-        }
-    }
+        lock.lock();
+        try {
+            stream.write(chunk.data());
+            expectedNextChunkNumber++;
+            bytesWritten += chunk.data().length;
 
-    /**
-     * Checks if the file is finished.
-     *
-     * @return {@code True} if the file is finished.
-     * @throws IOException If an I/O error occurs.
-     */
-    boolean isFinished() throws IOException {
-        return raf.length() == fileSize;
-    }
+            if (bytesWritten > expectedFileLength) {
+                throw new FileValidationException("File size mismatch: 
expected " + expectedFileLength + ", actual " + bytesWritten);
+            }
 
-    /**
-     * Sets the file size.
-     *
-     * @param fileSize File size.
-     */
-    void fileSize(long fileSize) {
-        if (fileSize < 0) {
-            throw new IllegalArgumentException("File size must be 
non-negative");
-        }
+            if (bytesWritten == expectedFileLength) {
+                stream.flush();

Review Comment:
   What's this for?



##########
modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/ChunkedFileWriter.java:
##########
@@ -17,99 +17,87 @@
 
 package org.apache.ignite.internal.network.file;
 
+import java.io.BufferedOutputStream;
+import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.file.Path;
-import java.util.PriorityQueue;
-import java.util.Queue;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import 
org.apache.ignite.internal.network.file.exception.FileValidationException;
 import org.apache.ignite.internal.network.file.messages.FileChunkMessage;
 
 /**
- * Chunked file writer. Writes chunks in order. If a chunk is not in order, it 
is stored in a queue. When the next chunk is written, the
- * queues are checked for the next chunk. If the next chunk is found, it is 
written to the file and removed from the queue. If the next
- * chunk is not found, the file is not written to. The writer is not 
thread-safe.
+ * Chunked file writer. Writes chunks to a file. Checks that the file size and 
chunk numbers are valid.
  */
 class ChunkedFileWriter implements AutoCloseable {
-    private static final int UNKNOWN_FILE_SIZE = -1;
+    private final BufferedOutputStream stream;
 
-    private final RandomAccessFile raf;
+    private final long expectedFileLength;
 
-    private long fileSize;
+    private long bytesWritten = 0;
 
-    private final Queue<FileChunkMessage> chunks = new 
PriorityQueue<>(FileChunkMessage.COMPARATOR);
-
-    private ChunkedFileWriter(RandomAccessFile raf, long fileSize) {
-        if (fileSize < UNKNOWN_FILE_SIZE) {
-            throw new IllegalArgumentException("File size must be 
non-negative");
-        }
-
-        this.raf = raf;
-        this.fileSize = fileSize;
-    }
+    private int expectedNextChunkNumber = 0;
 
     /**
-     * Opens a file with unknown size for writing.
-     *
-     * @param path File path.
-     * @return Chunked file writer.
-     * @throws FileNotFoundException If the file is not found.
+     * Lock to synchronize access to the file. We don't write to the file 
simultaneously, but the next chunk number and bytes written can be
+     * updated from different threads.
      */
-    static ChunkedFileWriter open(Path path) throws FileNotFoundException {
-        return new ChunkedFileWriter(new RandomAccessFile(path.toFile(), 
"rw"), UNKNOWN_FILE_SIZE);
+    private final Lock lock = new ReentrantLock();
+
+    private ChunkedFileWriter(BufferedOutputStream stream, long 
expectedFileLength) {
+        this.stream = stream;
+        this.expectedFileLength = expectedFileLength;
     }
 
     /**
      * Opens a file with known size for writing.
      *
-     * @param path File path.
-     * @param fileSize File size. If the file size is unknown, pass {@link 
#UNKNOWN_FILE_SIZE}.
+     * @param file File path.
+     * @param expectedFileLength Expected file size.
      * @return Chunked file writer.
      * @throws FileNotFoundException If the file is not found.
      */
-    static ChunkedFileWriter open(Path path, long fileSize) throws 
FileNotFoundException {
-        return new ChunkedFileWriter(new RandomAccessFile(path.toFile(), 
"rw"), fileSize);
+    static ChunkedFileWriter open(File file, long expectedFileLength) throws 
IOException {
+        return new ChunkedFileWriter(new BufferedOutputStream(new 
FileOutputStream(file)), expectedFileLength);
     }
 
     /**
-     * Writes a chunk to the file.
+     * Writes a chunk to the file. The chunk number must be equal to the next 
expected chunk number. Closes the file if the last chunk is
+     * written. Returns true if the last chunk is written.
      *
-     * @param chunk Chunk.
+     * @param chunk Chunk to write.
+     * @return True if the last chunk is written. False otherwise.
      * @throws IOException If an I/O error occurs.
+     * @throws FileValidationException If the chunk number or file size is 
invalid or expected file size is exceeded.
      */
-    void write(FileChunkMessage chunk) throws IOException {
-        chunks.add(chunk);
-        while (!chunks.isEmpty() && chunks.peek().offset() == 
raf.getFilePointer()) {
-            raf.write(chunks.poll().data());
+    boolean write(FileChunkMessage chunk) throws IOException {
+        if (chunk.number() != expectedNextChunkNumber) {

Review Comment:
   Maybe this check should be inside of the try/finally block



##########
modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/FileTransferServiceImpl.java:
##########
@@ -370,7 +394,7 @@ public <M extends Identifier> void addFileConsumer(
             Class<M> identifier,
             FileConsumer<M> consumer
     ) {
-        metadataToHandler.compute(
+        metadataToConsumer.compute(

Review Comment:
   Why did you renamed "handler" to "consumer"? Consumer is too broad of a term



##########
modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/FileTransferServiceImpl.java:
##########
@@ -112,7 +113,12 @@ public class FileTransferServiceImpl implements 
FileTransferService {
     /**
      * Map of file handlers.
      */
-    private final Map<Short, FileConsumer<Identifier>> metadataToHandler = new 
ConcurrentHashMap<>();
+    private final Map<Short, FileConsumer<Identifier>> metadataToConsumer = 
new ConcurrentHashMap<>();
+
+    /**
+     * Map of download requests consumers.
+     */
+    private final Map<UUID, DownloadRequestConsumer> transferIdToConsumer = 
new ConcurrentHashMap<>();

Review Comment:
   Why "consumer" instead of "handler"?



##########
modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/FileSender.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
+import static 
org.apache.ignite.internal.network.file.Channel.FILE_TRANSFER_CHANNEL;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.network.file.exception.FileTransferException;
+import org.apache.ignite.network.MessagingService;
+
+/**
+ * A class that sends files to a node. It uses a rate limiter to limit the 
bandwidth used. It also uses a thread pool to send the files in
+ * parallel.
+ */
+class FileSender {
+    private final int chunkSize;
+
+    private final Semaphore rateLimiter;

Review Comment:
   Can you reflect this in the javadoc?



##########
modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/FileTransferServiceImpl.java:
##########
@@ -390,75 +414,64 @@ public CompletableFuture<List<Path>> download(String 
sourceNodeConsistentId, Ide
                 .identifier(identifier)
                 .build();
 
-        try {
-            Path directory = createTransferDirectory(transferId);
-
-            CompletableFuture<List<Path>> downloadedFiles = 
fileReceiver.registerTransfer(
-                            sourceNodeConsistentId,
-                            transferId,
-                            directory
-                    )
-                    .thenApply(ignored -> {
-                        try {
-                            IgniteUtils.deleteIfExists(targetDir);
-
-                            Files.move(directory, targetDir, 
StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE);
-
-                            try (Stream<Path> stream = Files.list(targetDir)) {
-                                return stream.collect(Collectors.toList());
-                            }
-                        } catch (IOException e) {
-                            throw new FileTransferException("Failed to move 
downloaded files to target directory", e);
-                        }
-                    })
-                    .whenComplete((v, e) -> {
-                        if (e != null) {
-                            LOG.error("Failed to download files. Identifier: 
{}", identifier, e);
-                        }
-
-                        IgniteUtils.deleteIfExists(directory);
-                    });
-
-            return messagingService.invoke(sourceNodeConsistentId, 
FILE_TRANSFER_CHANNEL, downloadRequest, responseTimeout)
-                    .thenApply(FileDownloadResponse.class::cast)
-                    .whenCompleteAsync((response, e) -> {
-                        if (e != null) {
-                            fileReceiver.cancelTransfer(transferId, e);
-                        } else if (response.error() != null) {
-                            fileReceiver.cancelTransfer(transferId, 
buildException(response.error()));
-                        } else {
-                            fileReceiver.receiveFileHeaders(transferId, 
response.headers());
-                        }
-                    }, executorService)
-                    .thenCompose(v -> downloadedFiles);
-        } catch (Exception e) {
-            return failedFuture(e);
-        }
+        CompletableFuture<List<Path>> downloadedFiles = new 
CompletableFuture<List<Path>>()
+                .whenComplete((v, e) -> 
transferIdToConsumer.remove(transferId));
+
+        transferIdToConsumer.put(transferId, new 
DownloadRequestConsumer(downloadedFiles, targetDir));
+
+        messagingService.invoke(sourceNodeConsistentId, FILE_TRANSFER_CHANNEL, 
downloadRequest, responseTimeout)
+                .thenApply(FileDownloadResponse.class::cast)
+                .whenComplete((response, e) -> {
+                    if (e != null) {
+                        downloadedFiles.completeExceptionally(e);
+                    } else if (response.error() != null) {
+                        
downloadedFiles.completeExceptionally(toException(response.error()));
+                    }
+                });
+
+        return downloadedFiles;
     }
 
     @Override
     public CompletableFuture<Void> upload(String targetNodeConsistentId, 
Identifier identifier) {
         return getFileProvider(identifier).files(identifier)
-                .thenCompose(files -> {
-                    FileUploadRequest message = 
messageFactory.fileUploadRequest()
-                            .identifier(identifier)
-                            .headers(fromPaths(messageFactory, files))
-                            .build();
+                .thenCompose(files -> 
transferFilesToNode(targetNodeConsistentId, UUID.randomUUID(), identifier, 
files));
+    }
+
+    private CompletableFuture<Void> transferFilesToNode(
+            String targetNodeConsistentId,
+            UUID transferId,
+            Identifier identifier,
+            List<Path> paths
+    ) {
+        if (paths.isEmpty()) {
+            return failedFuture(new FileTransferException("No files to 
upload"));

Review Comment:
   Why is this a problem? What happens if user tries to do this, will he have 
an error in his log? It's not what he would expect



##########
modules/file-transfer/src/test/java/org/apache/ignite/internal/network/file/FileSenderTest.java:
##########
@@ -170,7 +219,7 @@ void maxConcurrentRequestsLimitIsNotExceeded() {
         FileSender sender = new FileSender(
                 CHUNK_SIZE,
                 new Semaphore(maxConcurrentRequests),
-                messagingService,
+                RESPONSE_TIMEOUT, messagingService,

Review Comment:
   Every parameter is on a new line, please fox other such places as well



##########
modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/FileSender.java:
##########
@@ -197,9 +209,16 @@ private CompletableFuture<Void> sendMessagesFromStream(
     ) {
         try {
             if (stream.hasNextMessage() && !shouldBeCancelled.get()) {
-                return messagingService.send(receiverConsistentId, 
FILE_TRANSFER_CHANNEL, stream.nextMessage())
+                return messagingService.invoke(receiverConsistentId, 
FILE_TRANSFER_CHANNEL, stream.nextMessage(), responseTimeout)
+                        .thenApply(FileChunkResponse.class::cast)
                         .thenComposeAsync(
-                                ignored -> 
sendMessagesFromStream(receiverConsistentId, stream, shouldBeCancelled),
+                                ack -> {
+                                    if (ack.error() != null) {
+                                        return 
failedFuture(toException(ack.error()));

Review Comment:
   How will receiver know that the operation is cancelled in case of error? 
Does it have its own timeout?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to