ibessonov commented on code in PR #2390:
URL: https://github.com/apache/ignite-3/pull/2390#discussion_r1289793714


##########
modules/core/src/main/java/org/apache/ignite/internal/util/FilesUtils.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import static java.util.stream.Collectors.toList;
+
+import java.io.File;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * Files utilities.
+ */
+public class FilesUtils {

Review Comment:
   Is this class only used in tests?



##########
modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/Channel.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import org.apache.ignite.network.ChannelType;
+
+/**
+ * Channel types used by the file transfer protocol.
+ */
+final class Channel {
+    /**
+     * File transfer channel.
+     */
+    static final ChannelType FILE_TRANSFER_CHANNEL = 
ChannelType.register((short) 1, "FileTransfer");

Review Comment:
   It's still not changed, you simply closed my previous comment. How are we 
going to have two channels with the same ID in main branch?



##########
modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/FileTransferServiceImpl.java:
##########
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static 
org.apache.ignite.internal.network.file.Channel.FILE_TRANSFER_CHANNEL;
+import static org.apache.ignite.internal.network.file.MessagesUtils.getHeaders;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import 
org.apache.ignite.internal.network.configuration.FileTransferConfiguration;
+import org.apache.ignite.internal.network.file.exception.FileTransferException;
+import org.apache.ignite.internal.network.file.messages.FileChunkMessage;
+import org.apache.ignite.internal.network.file.messages.FileDownloadRequest;
+import org.apache.ignite.internal.network.file.messages.FileDownloadResponse;
+import org.apache.ignite.internal.network.file.messages.FileTransferError;
+import 
org.apache.ignite.internal.network.file.messages.FileTransferErrorMessage;
+import org.apache.ignite.internal.network.file.messages.FileTransferFactory;
+import 
org.apache.ignite.internal.network.file.messages.FileTransferMessageType;
+import org.apache.ignite.internal.network.file.messages.FileUploadRequest;
+import org.apache.ignite.internal.network.file.messages.FileUploadResponse;
+import org.apache.ignite.internal.network.file.messages.Identifier;
+import org.apache.ignite.internal.util.ExceptionUtils;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.TopologyEventHandler;
+import org.apache.ignite.network.TopologyService;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * Implementation of {@link FileTransferService}.
+ */
+public class FileTransferServiceImpl implements FileTransferService {
+    private static final IgniteLogger LOG = 
Loggers.forClass(FileTransferServiceImpl.class);
+
+    /**
+     * Response timeout.
+     */
+    private final int reponseTimeout;
+
+    /**
+     * Topology service.
+     */
+    private final TopologyService topologyService;
+
+    /**
+     * Cluster service.
+     */
+    private final MessagingService messagingService;
+
+    /**
+     * Temporary directory for saving files.
+     */
+    private final Path tempDirectory;
+
+    /**
+     * File sender.
+     */
+    private final FileSender fileSender;
+
+    /**
+     * File receiver.
+     */
+    private final FileReceiver fileReceiver;
+
+    /**
+     * Map of file providers.
+     */
+    private final Map<Short, FileProvider<Identifier>> metadataToProvider = 
new ConcurrentHashMap<>();
+
+    /**
+     * Map of file handlers.
+     */
+    private final Map<Short, FileConsumer<Identifier>> metadataToHandler = new 
ConcurrentHashMap<>();
+
+    /**
+     * File transfer factory.
+     */
+    private final FileTransferFactory messageFactory = new 
FileTransferFactory();
+
+    /**
+     * Constructor.
+     *
+     * @param nodeName Node name.
+     * @param topologyService Topology service.
+     * @param messagingService Messaging service.
+     * @param configuration File transfer configuration.
+     * @param tempDirectory Temporary directory.
+     */
+    public FileTransferServiceImpl(
+            String nodeName,
+            TopologyService topologyService,
+            MessagingService messagingService,
+            FileTransferConfiguration configuration,
+            Path tempDirectory
+    ) {
+        this(
+                configuration.value().responseTimeout(),
+                topologyService,
+                messagingService,
+                tempDirectory,
+                new FileSender(
+                        nodeName,
+                        configuration.value().chunkSize(),
+                        configuration.value().senderThreadPoolSize(),
+                        new 
RateLimiterImpl(configuration.value().maxConcurrentRequests()),
+                        messagingService
+                ),
+                new FileReceiver(nodeName, 
configuration.value().receiverThreadPoolSize())
+        );
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param responseTimeout Response timeout.
+     * @param topologyService Topology service.
+     * @param messagingService Messaging service.
+     * @param tempDirectory Temporary directory.
+     * @param fileSender File sender.
+     * @param fileReceiver File receiver.
+     */
+    FileTransferServiceImpl(
+            int responseTimeout,
+            TopologyService topologyService,
+            MessagingService messagingService,
+            Path tempDirectory,
+            FileSender fileSender,
+            FileReceiver fileReceiver
+    ) {
+        this.reponseTimeout = responseTimeout;
+        this.topologyService = topologyService;
+        this.messagingService = messagingService;
+        this.tempDirectory = tempDirectory;
+        this.fileSender = fileSender;
+        this.fileReceiver = fileReceiver;
+    }
+
+    @Override
+    public void start() {
+        topologyService.addEventHandler(new TopologyEventHandler() {
+            @Override
+            public void onDisappeared(ClusterNode member) {
+                fileReceiver.cancelTransfersFromSender(member.id());

Review Comment:
   Shouldn't you use the `member.name()` here?



##########
modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/RateLimiter.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+/**
+ * Rate limiter interface.
+ */
+public interface RateLimiter {

Review Comment:
   I don't think that this interface is necessary at all



##########
modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/messages/FileHeader.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file.messages;
+
+import java.io.File;
+import java.nio.file.Path;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * File header.
+ */
+@Transferable(FileTransferMessageType.FILE_HEADER)
+public interface FileHeader extends NetworkMessage {
+    /**
+     * Returns the name of the file.
+     *
+     * @return File name.
+     */
+    String name();
+
+    /**
+     * Returns the size of the file in bytes.
+     *
+     * @return File size.
+     */
+    long length();
+
+    /**
+     * Creates a new {@link FileHeader} instance from the given {@link File}.
+     */
+    static FileHeader fromPath(FileTransferFactory factory, int id, Path path) 
{

Review Comment:
   Wait a minute, you don't even use this "id". Please remove it then



##########
modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/FileTransferServiceImpl.java:
##########
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static 
org.apache.ignite.internal.network.file.Channel.FILE_TRANSFER_CHANNEL;
+import static org.apache.ignite.internal.network.file.MessagesUtils.getHeaders;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import 
org.apache.ignite.internal.network.configuration.FileTransferConfiguration;
+import org.apache.ignite.internal.network.file.exception.FileTransferException;
+import org.apache.ignite.internal.network.file.messages.FileChunkMessage;
+import org.apache.ignite.internal.network.file.messages.FileDownloadRequest;
+import org.apache.ignite.internal.network.file.messages.FileDownloadResponse;
+import org.apache.ignite.internal.network.file.messages.FileTransferError;
+import 
org.apache.ignite.internal.network.file.messages.FileTransferErrorMessage;
+import org.apache.ignite.internal.network.file.messages.FileTransferFactory;
+import 
org.apache.ignite.internal.network.file.messages.FileTransferMessageType;
+import org.apache.ignite.internal.network.file.messages.FileUploadRequest;
+import org.apache.ignite.internal.network.file.messages.FileUploadResponse;
+import org.apache.ignite.internal.network.file.messages.Identifier;
+import org.apache.ignite.internal.util.ExceptionUtils;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.TopologyEventHandler;
+import org.apache.ignite.network.TopologyService;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * Implementation of {@link FileTransferService}.
+ */
+public class FileTransferServiceImpl implements FileTransferService {
+    private static final IgniteLogger LOG = 
Loggers.forClass(FileTransferServiceImpl.class);
+
+    /**
+     * Response timeout.
+     */
+    private final int reponseTimeout;
+
+    /**
+     * Topology service.
+     */
+    private final TopologyService topologyService;
+
+    /**
+     * Cluster service.
+     */
+    private final MessagingService messagingService;
+
+    /**
+     * Temporary directory for saving files.
+     */
+    private final Path tempDirectory;
+
+    /**
+     * File sender.
+     */
+    private final FileSender fileSender;
+
+    /**
+     * File receiver.
+     */
+    private final FileReceiver fileReceiver;
+
+    /**
+     * Map of file providers.
+     */
+    private final Map<Short, FileProvider<Identifier>> metadataToProvider = 
new ConcurrentHashMap<>();
+
+    /**
+     * Map of file handlers.
+     */
+    private final Map<Short, FileConsumer<Identifier>> metadataToHandler = new 
ConcurrentHashMap<>();
+
+    /**
+     * File transfer factory.
+     */
+    private final FileTransferFactory messageFactory = new 
FileTransferFactory();
+
+    /**
+     * Constructor.
+     *
+     * @param nodeName Node name.
+     * @param topologyService Topology service.
+     * @param messagingService Messaging service.
+     * @param configuration File transfer configuration.
+     * @param tempDirectory Temporary directory.
+     */
+    public FileTransferServiceImpl(
+            String nodeName,
+            TopologyService topologyService,
+            MessagingService messagingService,
+            FileTransferConfiguration configuration,
+            Path tempDirectory
+    ) {
+        this(
+                configuration.value().responseTimeout(),
+                topologyService,
+                messagingService,
+                tempDirectory,
+                new FileSender(
+                        nodeName,
+                        configuration.value().chunkSize(),
+                        configuration.value().senderThreadPoolSize(),
+                        new 
RateLimiterImpl(configuration.value().maxConcurrentRequests()),
+                        messagingService
+                ),
+                new FileReceiver(nodeName, 
configuration.value().receiverThreadPoolSize())
+        );
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param responseTimeout Response timeout.
+     * @param topologyService Topology service.
+     * @param messagingService Messaging service.
+     * @param tempDirectory Temporary directory.
+     * @param fileSender File sender.
+     * @param fileReceiver File receiver.
+     */
+    FileTransferServiceImpl(
+            int responseTimeout,
+            TopologyService topologyService,
+            MessagingService messagingService,
+            Path tempDirectory,
+            FileSender fileSender,
+            FileReceiver fileReceiver
+    ) {
+        this.reponseTimeout = responseTimeout;
+        this.topologyService = topologyService;
+        this.messagingService = messagingService;
+        this.tempDirectory = tempDirectory;
+        this.fileSender = fileSender;
+        this.fileReceiver = fileReceiver;
+    }
+
+    @Override
+    public void start() {
+        topologyService.addEventHandler(new TopologyEventHandler() {
+            @Override
+            public void onDisappeared(ClusterNode member) {
+                fileReceiver.cancelTransfersFromSender(member.id());
+            }
+        });
+
+        messagingService.addMessageHandler(FileTransferMessageType.class,
+                (message, senderConsistentId, correlationId) -> {
+                    if (message instanceof FileDownloadRequest) {
+                        processDownloadRequest((FileDownloadRequest) message, 
senderConsistentId, correlationId);
+                    } else if (message instanceof FileUploadRequest) {
+                        processUploadRequest((FileUploadRequest) message, 
senderConsistentId, correlationId);
+                    } else if (message instanceof FileTransferErrorMessage) {
+                        
fileReceiver.receiveFileTransferErrorMessage((FileTransferErrorMessage) 
message);
+                    } else if (message instanceof FileChunkMessage) {
+                        fileReceiver.receiveFileChunk((FileChunkMessage) 
message);
+                    } else {
+                        LOG.error("Unexpected message received: {}", message);
+                    }
+                });
+    }
+
+    private void processUploadRequest(FileUploadRequest message, String 
senderConsistentId, long correlationId) {
+        UUID transferId = UUID.randomUUID();
+
+        Path directory = createTransferDirectory(transferId);
+
+        CompletableFuture<List<Path>> uploadedFiles = 
fileReceiver.registerTransfer(
+                senderConsistentId,
+                transferId,
+                directory
+        );
+
+        fileReceiver.receiveFileHeaders(transferId, message.headers());
+
+        FileUploadResponse response = messageFactory.fileUploadResponse()
+                .transferId(transferId)
+                .build();
+
+        Identifier identifier = message.identifier();
+
+        messagingService.respond(senderConsistentId, FILE_TRANSFER_CHANNEL, 
response, correlationId)
+                .thenCompose(ignored -> {
+                    return uploadedFiles.thenCompose(files -> 
getFileConsumer(identifier).consume(identifier, files));
+                })
+                .whenComplete((v, e) -> {
+                    if (e != null) {
+                        LOG.error(
+                                "Failed to handle file upload. Transfer ID: 
{}. Metadata: {}",
+                                e,
+                                transferId,
+                                identifier
+                        );
+                    }
+
+                    IgniteUtils.deleteIfExists(directory);
+                });
+    }
+
+    private void processDownloadRequest(FileDownloadRequest message, String 
senderConsistentId, Long correlationId) {
+        getFileProvider(message.identifier()).files(message.identifier())
+                .whenComplete((files, e) -> {
+                    if (e != null) {
+                        LOG.error("Failed to get files for download. Metadata: 
{}", message.identifier(), e);
+
+                        FileDownloadResponse response = 
messageFactory.fileDownloadResponse()
+                                .error(buildError(e))
+                                .build();
+
+                        messagingService.respond(senderConsistentId, 
FILE_TRANSFER_CHANNEL, response, correlationId);
+                    } else if (files.isEmpty()) {
+                        LOG.warn("No files to download. Metadata: {}", 
message.identifier());
+
+                        FileDownloadResponse response = 
messageFactory.fileDownloadResponse()
+                                .error(buildError(new 
FileTransferException("No files to download")))
+                                .build();
+
+                        messagingService.respond(senderConsistentId, 
FILE_TRANSFER_CHANNEL, response, correlationId);
+                    } else {
+                        FileDownloadResponse response = 
messageFactory.fileDownloadResponse()
+                                .headers(getHeaders(messageFactory, files))
+                                .build();
+
+                        messagingService.respond(senderConsistentId, 
FILE_TRANSFER_CHANNEL, response, correlationId)
+                                .thenCompose(v -> 
sendFiles(senderConsistentId, message.transferId(), files));
+                    }
+                });
+    }
+
+    private CompletableFuture<Void> sendFiles(String targetNodeConsistentId, 
UUID transferId, List<Path> paths) {
+        return fileSender.send(targetNodeConsistentId, transferId, paths)
+                .whenComplete((v, e) -> {
+                    if (e != null) {
+                        LOG.error("Failed to send files to node: {}, transfer 
id: {}",
+                                e,
+                                targetNodeConsistentId,
+                                transferId
+                        );
+
+                        FileTransferErrorMessage message = 
messageFactory.fileTransferErrorMessage()
+                                .transferId(transferId)
+                                .error(buildError(e))
+                                .build();
+
+                        messagingService.send(targetNodeConsistentId, 
FILE_TRANSFER_CHANNEL, message);
+                    }
+                });
+    }
+
+    @Override
+    public void stop() throws Exception {
+        IgniteUtils.closeAllManually(fileSender, fileReceiver);
+    }
+
+    @Override
+    public <M extends Identifier> void addFileProvider(
+            Class<M> identifier,
+            FileProvider<M> provider
+    ) {
+        metadataToProvider.compute(
+                getMessageType(identifier),
+                (k, v) -> {
+                    if (v != null) {
+                        throw new IllegalArgumentException("File provider for 
metadata " + identifier.getName() + " already exists");
+                    } else {
+                        return (FileProvider<Identifier>) provider;
+                    }
+                }
+        );
+    }
+
+    @Override
+    public <M extends Identifier> void addFileConsumer(
+            Class<M> identifier,
+            FileConsumer<M> consumer
+    ) {
+        metadataToHandler.compute(
+                getMessageType(identifier),
+                (k, v) -> {
+                    if (v != null) {
+                        throw new IllegalArgumentException("File handler for 
metadata " + identifier.getName() + " already exists");
+                    } else {
+                        return (FileConsumer<Identifier>) consumer;
+                    }
+                }
+        );
+    }
+
+    @Override
+    public CompletableFuture<List<Path>> download(String 
sourceNodeConsistentId, Identifier identifier, Path targetDir) {
+        UUID transferId = UUID.randomUUID();
+        FileDownloadRequest downloadRequest = 
messageFactory.fileDownloadRequest()
+                .transferId(transferId)
+                .identifier(identifier)
+                .build();
+
+        try {
+            Path directory = createTransferDirectory(transferId);
+
+            // Wrap in the completed future to avoid blocking.

Review Comment:
   Can you please elaborate?



##########
modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/ChunkedFileWriter.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.file.Path;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import 
org.apache.ignite.internal.network.file.exception.FileValidationException;
+import org.apache.ignite.internal.network.file.messages.FileChunkMessage;
+
+/**
+ * Chunked file writer. Writes chunks in order. If a chunk is not in order, it 
is stored in a queue. When the next chunk is written, the
+ * queues are checked for the next chunk. If the next chunk is found, it is 
written to the file and removed from the queue. If the next
+ * chunk is not found, the file is not written to. The writer is not 
thread-safe

Review Comment:
   ```suggestion
    * chunk is not found, the file is not written to. The writer is not 
thread-safe.
   ```



##########
modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/PathMatcher.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.testframework.matchers;
+
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.equalTo;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import org.hamcrest.FeatureMatcher;
+import org.hamcrest.Matcher;
+
+/**
+ * Matchers for {@link Path}.
+ */
+public final class PathMatcher {
+
+    private PathMatcher() {
+    }
+
+    /**
+     * Creates a matcher for matching file content and name.
+     *
+     * @param expectedFile Expected file.
+     * @return Matcher that checks if file has the same content and name as 
expected file.
+     */
+    public static Matcher<Path> hasSameContentAndName(Path expectedFile) {
+        return 
both(hasSameContent(expectedFile)).and(hasSameName(expectedFile));
+    }
+
+    /**
+     * Creates a matcher for matching file content.
+     *
+     * @param expectedFile Expected file.
+     * @return Matcher for matching file content.
+     */
+    public static Matcher<Path> hasSameContent(Path expectedFile) {
+        return hasContent(equalTo(readFile(expectedFile)));
+    }
+
+    /**
+     * Creates a matcher for matching file content.
+     *
+     * @param contentMatcher Matcher for matching file content.
+     * @return Matcher for matching file content.
+     */
+    public static Matcher<Path> hasContent(Matcher<byte[]> contentMatcher) {
+        return new FeatureMatcher<>(contentMatcher, "A file with content", 
"content") {
+            @Override
+            protected byte[] featureValueOf(Path actual) {
+                return readFile(actual);
+            }
+        };
+    }
+
+    /**
+     * Creates a matcher for matching file name.
+     *
+     * @param expectedFile Expected file.
+     * @return Matcher for matching file name.
+     */
+    public static Matcher<Path> hasSameName(Path expectedFile) {
+        return hasName(equalTo(expectedFile.getFileName().toString()));
+    }
+
+    /**
+     * Creates a matcher for matching file name.
+     *
+     * @param nameMatcher Matcher for matching file name.
+     * @return Matcher for matching file name.
+     */
+    public static Matcher<Path> hasName(Matcher<String> nameMatcher) {
+        return new FeatureMatcher<>(nameMatcher, "A file with name", "name") {
+            @Override
+            protected String featureValueOf(Path actual) {
+                return actual.getFileName().toString();
+            }
+        };
+    }
+
+    private static byte[] readFile(Path path) {
+        try {
+            return Files.readAllBytes(path);
+        } catch (IOException e) {
+            throw new RuntimeException("Could not read file content", e);
+        }
+    }
+
+}

Review Comment:
   ```suggestion
       }
   }
   ```



##########
modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/FileTransferMessagesStream.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.network.file.messages.FileChunkMessage;
+import org.apache.ignite.internal.network.file.messages.FileTransferFactory;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * A stream of messages to send file.
+ */
+public class FileTransferMessagesStream implements Iterable<FileChunkMessage>, 
AutoCloseable {
+    private final UUID transferId;
+
+    private final ChunkedFileReader reader;
+
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+
+    private final FileTransferFactory messageFactory = new 
FileTransferFactory();
+
+    /**
+     * Creates a new stream of messages to send files.
+     *
+     * @param transferId the id of the stream.
+     * @param reader the reader of the file to send.
+     */
+    private FileTransferMessagesStream(
+            UUID transferId,
+            ChunkedFileReader reader
+    ) {
+        this.transferId = transferId;
+        this.reader = reader;
+    }
+
+    /**
+     * Creates a new stream of messages to send file.
+     *
+     * @param chunkSize the size of the chunks to send.
+     * @param transferId the id of the transfer.
+     * @param path the path of the file to send.
+     * @return a new stream of messages to send files.
+     * @throws IOException if an I/O error occurs.
+     */
+    public static FileTransferMessagesStream fromPath(
+            int chunkSize,
+            UUID transferId,
+            Path path
+    ) throws IOException {
+        if (chunkSize <= 0) {
+            throw new IllegalArgumentException("Chunk size must be positive");
+        }
+
+        return new FileTransferMessagesStream(transferId, 
ChunkedFileReader.open(path.toFile(), chunkSize));
+    }
+
+    /**
+     * Returns true if there are more messages to send.
+     *
+     * @return true if there are more messages to send.
+     */
+    boolean hasNextMessage() throws IOException {
+        // Check that the stream is not closed and the reader has more chunks.
+        return !closed.get() && reader.hasNextChunk();
+    }
+
+    /**
+     * Returns the next message to send.
+     *
+     * @return the next message to send.
+     * @throws IOException if an I/O error occurs.
+     * @throws IllegalStateException if there are no more messages to send.
+     */
+    FileChunkMessage nextMessage() throws IOException {
+        if (hasNextMessage()) {
+            return nextChunk();
+        } else {
+            throw new IllegalStateException("There are no more messages to 
send");
+        }
+    }
+
+    /**
+     * Returns the next chunk of the current file. Throws an exception if the 
current file is finished.
+     *
+     * @return the next chunk of the current file.
+     * @throws IOException if an I/O error occurs.
+     * @throws IllegalStateException if the current file is finished.
+     */
+    private FileChunkMessage nextChunk() throws IOException {
+        return messageFactory.fileChunkMessage()
+                .transferId(transferId)
+                .fileName(reader.fileName())
+                .offset(reader.offset())
+                .data(reader.readNextChunk())
+                .build();
+    }
+
+    /**
+     * Closes the stream.
+     */
+    @Override
+    public void close() throws IOException {
+        if (closed.compareAndSet(false, true)) {
+            reader.close();
+        }
+    }
+
+    @NotNull

Review Comment:
   We don't use NotNull annotation, please remove it



##########
modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java:
##########
@@ -327,6 +327,12 @@ public static class Network {
 
         /** Port is in use. */
         public static final int PORT_IN_USE_ERR = 
NETWORK_ERR_GROUP.registerErrorCode((short) 2);
+
+        /** File transfer error. */
+        public static final int FILE_TRANSFER_ERR = 
NETWORK_ERR_GROUP.registerErrorCode((short) 3);
+
+        /** File validation error. */
+        public static final int FILE_VALIDATION_ERR = 
NETWORK_ERR_GROUP.registerErrorCode((short) 4);

Review Comment:
   Exceptions are still in internal packages. What are the circumstances, in 
which a user would get the exception with one of these error codes? I don't see 
any, so please move these error codes into a proper place.



##########
modules/file-transfer/src/integrationTest/java/org/apache/ignite/internal/network/file/ItFileTransferTest.java:
##########
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.network.file.FileGenerator.randomFile;
+import static 
org.apache.ignite.internal.network.file.PathAssertions.namesAndContentEquals;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.not;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.nio.file.AccessDeniedException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import 
org.apache.ignite.internal.network.configuration.FileTransferConfiguration;
+import org.apache.ignite.internal.network.file.TestCluster.Node;
+import org.apache.ignite.internal.network.file.exception.FileTransferException;
+import org.apache.ignite.internal.network.file.messages.FileTransferFactory;
+import org.apache.ignite.internal.network.file.messages.Identifier;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Integration tests for file transferring.
+ */
+@ExtendWith(ConfigurationExtension.class)
+@ExtendWith(WorkDirectoryExtension.class)
+public class ItFileTransferTest {
+    private static final String DOWNLOADS_DIR = "downloads";
+
+    @WorkDirectory
+    private Path workDir;
+
+    @InjectConfiguration("mock.chunkSize=1024")
+    private FileTransferConfiguration configuration;
+
+    private TestCluster cluster;
+
+    private Path sourceDir;
+
+    private final FileTransferFactory messageFactory = new 
FileTransferFactory();
+
+    @BeforeEach
+    void setUp(TestInfo testInfo) throws InterruptedException, IOException {
+        sourceDir = Files.createDirectories(workDir.resolve("source"));
+
+        cluster = new TestCluster(2, configuration, workDir, testInfo);
+        cluster.startAwait();
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        cluster.shutdown();
+    }
+
+    @Test
+    void downloadToEmptyDir() {
+        // Generate files on the source node.
+        Node sourceNode = cluster.members.get(0);
+
+        int chunkSize = configuration.value().chunkSize();
+        Path file1 = randomFile(sourceDir, chunkSize);
+        Path file2 = randomFile(sourceDir, chunkSize + 1);
+        Path file3 = randomFile(sourceDir, chunkSize * 2);
+        Path file4 = randomFile(sourceDir, chunkSize * 2);
+
+        // Add file provider to the source node.
+        List<Path> files = List.of(file1, file2, file3, file4);
+        sourceNode.fileTransferringService().addFileProvider(
+                Identifier.class,
+                req -> completedFuture(files)
+        );
+
+        // Download files on the target node from the source node.
+        Node targetNode = cluster.members.get(1);
+        CompletableFuture<List<Path>> downloadedFilesFuture = 
targetNode.fileTransferringService().download(
+                sourceNode.nodeName(),
+                messageFactory.identifier().build(),
+                targetNode.workDir().resolve(DOWNLOADS_DIR)
+        );
+
+        // Check that files were downloaded.
+        assertThat(
+                downloadedFilesFuture,
+                willBe(namesAndContentEquals(files))
+        );
+
+        // Check that files were downloaded to the correct directory.
+        assertDownloadsDirContainsFiles(targetNode, files);
+
+        // Check temporary files were deleted.
+        assertTemporaryFilesWereDeleted(targetNode);
+    }
+
+    @Test
+    void downloadToNonEmptyDir() throws IOException {
+        // Generate files on the source node.
+        Node sourceNode = cluster.members.get(0);
+
+        int chunkSize = configuration.value().chunkSize();
+        Path file1 = randomFile(sourceDir, chunkSize);
+        Path file2 = randomFile(sourceDir, chunkSize + 1);
+        Path file3 = randomFile(sourceDir, chunkSize * 2);
+        Path file4 = randomFile(sourceDir, chunkSize * 2);
+
+        // Add file provider to the source node.
+        List<Path> files = List.of(file1, file2, file3, file4);
+        sourceNode.fileTransferringService().addFileProvider(
+                Identifier.class,
+                req -> completedFuture(files)
+        );
+
+        Node targetNode = cluster.members.get(1);
+
+        // Create downloads directory.
+        Path targetNodeDownloadsDir = 
targetNode.workDir().resolve(DOWNLOADS_DIR);
+        Files.createDirectories(targetNodeDownloadsDir);
+
+        // Create some files in the downloads directory.
+        Path file5 = randomFile(targetNodeDownloadsDir, chunkSize);
+        Path file6 = randomFile(targetNodeDownloadsDir, chunkSize + 1);
+
+        // Download files on the target node from the source node.
+        CompletableFuture<List<Path>> downloadedFilesFuture = 
targetNode.fileTransferringService().download(
+                sourceNode.nodeName(),
+                messageFactory.identifier().build(),
+                targetNodeDownloadsDir
+        );
+
+        // Check that files were downloaded.
+        assertThat(
+                downloadedFilesFuture,
+                willBe(namesAndContentEquals(files))
+        );
+
+        // Check that files were downloaded to the correct directory.
+        assertDownloadsDirContainsFiles(targetNode, files);
+
+        // Check temporary files were deleted.
+        assertTemporaryFilesWereDeleted(targetNode);
+
+        // Check that previous files in the downloads directory were deleted.
+        try (Stream<Path> stream = Files.list(targetNodeDownloadsDir)) {
+            List<String> filesInDownloadsDir = stream.map(Path::getFileName)
+                    .map(Path::toString)
+                    .collect(Collectors.toList());
+            assertThat(filesInDownloadsDir, 
not(containsInAnyOrder(file5.getFileName().toString(), 
file6.getFileName().toString())));
+        }
+    }
+
+    @Test
+    void downloadNonReadableFiles() {
+        // Generate files on the source node.
+        Node sourceNode = cluster.members.get(0);
+
+        int chunkSize = configuration.value().chunkSize();
+        Path file1 = randomFile(sourceDir, chunkSize - 1);
+        Path file2 = randomFile(sourceDir, chunkSize + 1);
+        Path file3 = randomFile(sourceDir, chunkSize * 2);
+
+        // Make file2 non-readable.
+        assertTrue(file2.toFile().setReadable(false));
+
+        // Add file provider to the source node.
+        sourceNode.fileTransferringService().addFileProvider(
+                Identifier.class,
+                req -> completedFuture(List.of(file1, file2, file3))
+        );
+
+        // Download files on the target node from the source node.
+        Node targetNode = cluster.members.get(1);
+        CompletableFuture<List<Path>> download = 
targetNode.fileTransferringService().download(
+                sourceNode.nodeName(),
+                messageFactory.identifier().build(),
+                targetNode.workDir().resolve(DOWNLOADS_DIR)
+        );
+
+        // Check that files transfer failed.
+        assertThat(download, willThrow(FileTransferException.class, "Failed to 
create a file transfer stream"));
+
+        // Check temporary files were deleted.
+        assertTemporaryFilesWereDeleted(targetNode);
+    }
+
+    @Test
+    void downloadFilesWhenProviderThrowsException() {
+        // Add file provider to the source node that throws an exception.
+        Node sourceNode = cluster.members.get(0);
+        sourceNode.fileTransferringService().addFileProvider(
+                Identifier.class,
+                req -> failedFuture(new RuntimeException("Test exception"))
+        );
+
+        // Download files on the target node from the source node.
+        Node targetNode = cluster.members.get(1);
+        CompletableFuture<List<Path>> download = 
targetNode.fileTransferringService().download(
+                sourceNode.nodeName(),
+                messageFactory.identifier().build(),
+                targetNode.workDir().resolve(DOWNLOADS_DIR)
+        );
+
+        // Check that files transfer failed.
+        assertThat(download, 
willThrowWithCauseOrSuppressed(FileTransferException.class));
+
+        // Check temporary files were deleted.
+        assertTemporaryFilesWereDeleted(targetNode);
+    }
+
+    @Test
+    void downloadFilesWhenDoNotHaveAccessToWrite() {
+        // Generate files on the source node.
+        Node sourceNode = cluster.members.get(0);
+
+        int chunkSize = configuration.value().chunkSize();
+        Path file1 = randomFile(sourceDir, 0);
+        Path file2 = randomFile(sourceDir, chunkSize + 1);
+        Path file3 = randomFile(sourceDir, chunkSize * 2);
+        Path file4 = randomFile(sourceDir, chunkSize * 2);
+
+        // Add file provider to the source node.
+        sourceNode.fileTransferringService().addFileProvider(
+                Identifier.class,
+                req -> completedFuture(List.of(file1, file2, file3, file4))
+        );
+
+        // Make work directory not writable on the target node.
+        Node targetNode = cluster.members.get(1);
+        assertTrue(targetNode.workDir().toFile().setWritable(false));
+
+        // Download files on the target node from the source node.
+        CompletableFuture<List<Path>> downloadedFilesFuture = 
targetNode.fileTransferringService().download(
+                sourceNode.nodeName(),
+                messageFactory.identifier().build(),
+                targetNode.workDir().resolve(DOWNLOADS_DIR)
+        );
+
+        // Check that files transfer failed.
+        assertThat(
+                downloadedFilesFuture,
+                willThrowWithCauseOrSuppressed(AccessDeniedException.class)
+        );
+
+        // Check temporary files were deleted.
+        assertTemporaryFilesWereDeleted(targetNode);
+    }
+
+    @Test
+    void downloadEmptyFileList() {
+        // Set empty file list provider on the source node.
+        Node sourceNode = cluster.members.get(0);
+        sourceNode.fileTransferringService().addFileProvider(
+                Identifier.class,
+                req -> completedFuture(List.of())
+        );
+
+        // Download files on the target node from the source node.
+        Node targetNode = cluster.members.get(1);
+        CompletableFuture<List<Path>> downloadedFiles = 
targetNode.fileTransferringService().download(
+                sourceNode.nodeName(),
+                messageFactory.identifier().build(),
+                targetNode.workDir().resolve(DOWNLOADS_DIR)
+
+        );
+
+        // Check that files transfer failed.
+        assertThat(downloadedFiles, willThrow(FileTransferException.class, "No 
files to download"));
+
+        // Check temporary files were deleted.
+        assertTemporaryFilesWereDeleted(targetNode);
+    }
+
+    @Test
+    void upload() {
+        // Generate files on the source node.
+        Node sourceNode = cluster.members.get(0);
+
+        int chunkSize = configuration.value().chunkSize();
+        Path file1 = randomFile(sourceDir, chunkSize);
+        Path file2 = randomFile(sourceDir, chunkSize - 1);
+        Path file3 = randomFile(sourceDir, chunkSize + 1);
+        Path file4 = randomFile(sourceDir, chunkSize * 2);
+
+        // Add file provider to the source node.
+        List<Path> files = List.of(file1, file2, file3, file4);
+        sourceNode.fileTransferringService().addFileProvider(
+                Identifier.class,
+                req -> completedFuture(files)
+        );
+
+        Node targetNode = cluster.members.get(1);
+
+        // The future that will be completed when the consumer is called.
+        CompletableFuture<Void> uploadedFilesFuture = new 
CompletableFuture<>();
+        targetNode.fileTransferringService().addFileConsumer(Identifier.class, 
((metadata, uploadedFiles) -> {
+            return uploadedFilesFuture.completeAsync(() -> {
+                assertThat(uploadedFiles, namesAndContentEquals(files));
+                return null;
+            });
+        }));
+
+        // Upload files on the target node from the source node.
+        CompletableFuture<Void> upload = sourceNode.fileTransferringService()
+                .upload(targetNode.nodeName(), 
messageFactory.identifier().build());
+
+        // Check that transfer was successful.
+        assertThat(
+                upload,
+                CompletableFutureMatcher.willCompleteSuccessfully()
+        );
+
+        // Check the consumer was called.
+        assertThat(
+                uploadedFilesFuture,
+                CompletableFutureMatcher.willCompleteSuccessfully()
+        );
+
+        // Check temporary files were deleted.
+        await().until(() -> targetNode.workDir().toFile().listFiles().length 
== 0);
+    }
+
+    @Test
+    void uploadEmptyFileList() {
+        // Set empty file list provider on the source node.
+        Node sourceNode = cluster.members.get(0);
+        sourceNode.fileTransferringService().addFileProvider(
+                Identifier.class,
+                req -> completedFuture(List.of())
+        );
+
+        // Upload files on the target node from the source node.
+        Node targetNode = cluster.members.get(1);
+        CompletableFuture<Void> uploaded = sourceNode.fileTransferringService()
+                .upload(targetNode.nodeName(), 
messageFactory.identifier().build());
+
+        // Check that files transfer failed.
+        assertThat(
+                uploaded,
+                willThrow(FileTransferException.class, "No files to upload")
+        );
+
+        // Check temporary files were deleted.
+        await().until(() -> targetNode.workDir().toFile().listFiles().length 
== 0);
+    }
+
+    @Test
+    void uploadNonReadableFiles() {
+        // Generate files on the source node.
+        Node sourceNode = cluster.members.get(0);
+
+        int chunkSize = configuration.value().chunkSize();
+        Path file1 = randomFile(sourceDir, chunkSize - 1);
+        Path file2 = randomFile(sourceDir, chunkSize + 1);
+        Path file3 = randomFile(sourceDir, chunkSize * 2);
+
+        // Make file2 non-readable.
+        assertTrue(file2.toFile().setReadable(false));
+
+        // Add file provider to the source node.
+        sourceNode.fileTransferringService().addFileProvider(
+                Identifier.class,
+                req -> completedFuture(List.of(file1, file2, file3))
+        );
+
+        // Set file consumer on the target node.
+        Node targetNode = cluster.members.get(1);
+
+        // The future will be completed when the file consumer is called.
+        CompletableFuture<List<Path>> uploadedFilesFuture = new 
CompletableFuture<>();
+        targetNode.fileTransferringService().addFileConsumer(Identifier.class, 
((metadata, uploadedFiles) -> {
+            uploadedFilesFuture.complete(uploadedFiles);
+            return completedFuture(null);
+        }));
+
+        // Upload files on the target node from the source node.
+        Identifier identifier = messageFactory.identifier().build();
+        CompletableFuture<Void> upload = 
sourceNode.fileTransferringService().upload(targetNode.nodeName(), identifier);
+
+        // Check that files transfer failed.
+        assertThat(
+                upload,
+                willThrow(FileTransferException.class, "Failed to create a 
file transfer stream")
+        );
+
+        // Check the consumer was not called.
+        assertFalse(uploadedFilesFuture.isDone());
+
+        // Check temporary files were deleted.
+        await().until(() -> targetNode.workDir().toFile().listFiles().length 
== 0);
+    }
+
+    @Test
+    void uploadFilesWhenProviderThrowsException() {
+        // Set file provider on the source node that throws an exception.
+        Node sourceNode = cluster.members.get(0);
+
+        sourceNode.fileTransferringService().addFileProvider(
+                Identifier.class,
+                req -> failedFuture(new RuntimeException("Test exception"))
+        );
+
+        // Set file consumer on the target node.
+        Node targetNode = cluster.members.get(1);
+
+        // The future will be completed when the file consumer is called.
+        CompletableFuture<List<Path>> uploadedFilesFuture = new 
CompletableFuture<>();
+        targetNode.fileTransferringService().addFileConsumer(Identifier.class, 
((metadata, uploadedFiles) -> {
+            uploadedFilesFuture.complete(uploadedFiles);
+            return completedFuture(null);
+        }));
+
+        // Upload files on the target node from the source node.
+        CompletableFuture<Void> uploaded = sourceNode.fileTransferringService()
+                .upload(targetNode.nodeName(), 
messageFactory.identifier().build());
+
+        // Check that files transfer failed.
+        assertThat(
+                uploaded,
+                willThrow(RuntimeException.class, "Test exception")
+        );
+
+        // Check the consumer was not called.
+        assertFalse(uploadedFilesFuture.isDone());
+
+        // Check temporary files were deleted.
+        await().until(() -> targetNode.workDir().toFile().listFiles().length 
== 0);
+    }
+
+

Review Comment:
   Do we allow several consecutive empty lines? I thought it won't pass the 
checkstyle, interesting..



##########
modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/MessagesUtils.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import static java.util.stream.Collectors.toList;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.network.file.messages.FileHeader;
+import org.apache.ignite.internal.network.file.messages.FileTransferFactory;
+
+/**
+ * Utility class for file transfer messages.
+ */
+final class MessagesUtils {
+    private MessagesUtils() {
+        // No-op.
+    }
+
+    /**
+     * Extracts file headers from files.
+     *
+     * @param factory File transfer factory.
+     * @param paths List of paths.
+     * @return List of file headers.
+     */
+    static List<FileHeader> getHeaders(FileTransferFactory factory, List<Path> 
paths) {

Review Comment:
   Is this reasonable to extract such small method into utilities?
   And why do we store the index of an element inside of the element? What's 
its purpose, it can be derived from the list



##########
modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/messages/Identifier.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file.messages;
+
+import org.apache.ignite.internal.network.file.FileConsumer;
+import org.apache.ignite.internal.network.file.FileProvider;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * Files identifier. This interface is used to mark all identifier messages. 
Identifier messages are used to retrieve files from
+ * {@link FileProvider} and handle them on the receiving side by {@link 
FileConsumer}.
+ */
+@Transferable(FileTransferMessageType.FILE_IDENTIFIER)
+public interface Identifier extends NetworkMessage {

Review Comment:
   Marker interface doesn't need to be transferable itself



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to