rpuch commented on code in PR #2390:
URL: https://github.com/apache/ignite-3/pull/2390#discussion_r1281514633


##########
modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/FileContentMatcher.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.testframework.matchers;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+
+/**
+ * {@link TypeSafeMatcher} for matching file content.
+ */
+public class FileContentMatcher extends TypeSafeMatcher<Path> {
+
+    private final byte[] expectedContent;
+
+    private FileContentMatcher(byte[] expectedContent) {
+        this.expectedContent = expectedContent;
+    }
+
+    /**
+     * Creates a matcher for matching file content.
+     *
+     * @param path Path to the file.
+     * @return Matcher for matching file content.
+     */
+    public static FileContentMatcher hasContent(Path path) {

Review Comment:
   How about adding a `Matcher` as a parameter? That would make the matcher 
more flexible. An overload without the `Matcher` parameter can still be left 
(it would use `is()` as a matcher)



##########
modules/file-transferring/src/testFixtures/java/org/apache/ignite/internal/network/file/FileAssertions.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import static 
org.apache.ignite.internal.testframework.matchers.FileContentMatcher.hasContent;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * File assertions.
+ */
+public class FileAssertions {
+
+    /**
+     * Asserts that the content of the two directories is the same.
+     *
+     * @param expected Expected directory.
+     * @param actual Actual directory.
+     */
+    public static void assertContentEquals(Path expected, Path actual) {
+        try {
+            Files.walkFileTree(expected, new SimpleFileVisitor<>() {
+                @Override
+                public FileVisitResult preVisitDirectory(Path dir, 
BasicFileAttributes attrs) {
+                    Path otherDir = actual.resolve(expected.relativize(dir));
+                    if (!Files.exists(otherDir)) {
+                        throw new AssertionError("Directory " + dir + " does 
not exist in " + actual);
+                    }
+                    List<String> expectedFiles = 
Arrays.stream(dir.toFile().listFiles())

Review Comment:
   `File#listFiles()` is nullable. Probably a possibility for a null should be 
handled here



##########
modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/FileContentMatcher.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.testframework.matchers;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+
+/**
+ * {@link TypeSafeMatcher} for matching file content.
+ */
+public class FileContentMatcher extends TypeSafeMatcher<Path> {
+
+    private final byte[] expectedContent;
+
+    private FileContentMatcher(byte[] expectedContent) {
+        this.expectedContent = expectedContent;
+    }
+
+    /**
+     * Creates a matcher for matching file content.
+     *
+     * @param path Path to the file.
+     * @return Matcher for matching file content.
+     */
+    public static FileContentMatcher hasContent(Path path) {
+        try {
+            return new FileContentMatcher(Files.readAllBytes(path));
+        } catch (IOException e) {
+            throw new RuntimeException("Could not read file content", e);
+        }
+    }
+
+    @Override
+    protected boolean matchesSafely(Path path) {
+        try {
+            byte[] actualContent = Files.readAllBytes(path);
+            return Arrays.equals(actualContent, expectedContent);
+        } catch (IOException e) {
+            throw new RuntimeException("Could not read file content", e);
+        }
+    }
+
+    @Override
+    public void describeTo(Description description) {
+        description.appendText("matches content: 
").appendValue(expectedContent);
+    }
+
+    @Override
+    protected void describeMismatchSafely(Path item, Description 
mismatchDescription) {
+        try {
+            mismatchDescription.appendText("was 
").appendValue(Files.readAllLines(item));

Review Comment:
   Why is it `readAllLines()` here, while on line 55 it's `readAllBytes()`?



##########
modules/file-transferring/src/integrationTest/java/org/apache/ignite/internal/network/file/ItFileTransferringTest.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.network.file.FileAssertions.assertContentEquals;
+import static org.apache.ignite.internal.network.file.FileGenerator.randomFile;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.network.file.TestCluster.Node;
+import org.apache.ignite.internal.network.file.messages.TransferMetadata;
+import org.apache.ignite.internal.network.file.messages.TransferMetadataImpl;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Integration tests for file transferring.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class ItFileTransferringTest {
+
+    @WorkDirectory
+    private Path workDir;
+
+    private TestCluster cluster;
+
+    @BeforeEach
+    void setUp(TestInfo testInfo) throws InterruptedException {
+        cluster = new TestCluster(2, workDir, testInfo);
+        cluster.startAwait();
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        cluster.shutdown();
+    }
+
+    @Test
+    void download() throws IOException {
+        Node node0 = cluster.members.get(0);
+
+        String unit = "unit";
+        String version = "1.0.0";
+        Path unitPath = node0.workDir().resolve(unit).resolve(version);
+        Files.createDirectories(unitPath);
+
+        File file1 = randomFile(unitPath, 1024 * 1024 * 10).toFile();

Review Comment:
   Should we delete the files after the test?



##########
modules/file-transferring/src/main/java/org/apache/ignite/internal/network/file/FileTransferringService.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import java.nio.file.Path;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.network.file.messages.TransferMetadata;
+
+/**
+ * File transferring service.
+ */
+public interface FileTransferringService extends IgniteComponent {
+    /**
+     * Adds a file provider for the given metadata.
+     *
+     * @param metadata Metadata.
+     * @param provider Provider.
+     */
+    <M extends TransferMetadata> void addFileProvider(
+            Class<M> metadata,
+            FileProvider<M> provider
+    );
+
+    /**
+     * Adds a file handler for the given metadata.
+     *
+     * @param metadata Metadata.
+     * @param handler Handler.
+     */
+    <M extends TransferMetadata> void addFileHandler(Class<M> metadata, 
FileHandler<M> handler);

Review Comment:
   Don't we need a way to deregister a file handler (or a file provider)?



##########
modules/file-transferring/src/main/java/org/apache/ignite/internal/network/file/FileTransferringServiceImpl.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.file.messages.ChunkedFile;
+import org.apache.ignite.internal.network.file.messages.FileDownloadRequest;
+import 
org.apache.ignite.internal.network.file.messages.FileDownloadRequestImpl;
+import org.apache.ignite.internal.network.file.messages.FileDownloadResponse;
+import 
org.apache.ignite.internal.network.file.messages.FileDownloadResponseImpl;
+import org.apache.ignite.internal.network.file.messages.FileHeader;
+import org.apache.ignite.internal.network.file.messages.FileTransferInfo;
+import 
org.apache.ignite.internal.network.file.messages.FileTransferringMessageType;
+import org.apache.ignite.internal.network.file.messages.FileUploadRequest;
+import org.apache.ignite.internal.network.file.messages.FileUploadRequestImpl;
+import org.apache.ignite.internal.network.file.messages.TransferMetadata;
+import org.apache.ignite.internal.util.FilesUtils;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ChannelType;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * Implementation of {@link FileTransferringService}.
+ */
+public class FileTransferringServiceImpl implements FileTransferringService {
+    private static final IgniteLogger LOG = 
Loggers.forClass(FileTransferringServiceImpl.class);
+
+    private static final int CHUNK_SIZE = 1024 * 1024;
+    private static final int CONCURRENT_REQUESTS = 10;
+    private static final ChannelType FILE_TRANSFERRING_CHANNEL = 
ChannelType.register((short) 1, "FileTransferring");
+
+    /**
+     * Cluster service.
+     */
+    private final MessagingService messagingService;
+
+    private final Path tempDirectory;
+
+    private final FileSender fileSender;
+
+    private final Map<UUID, FileReceiver> transferIdToReceiver = new 
ConcurrentHashMap<>();
+    private final Map<Short, FileProvider<TransferMetadata>> 
metadataToProvider = new ConcurrentHashMap<>();

Review Comment:
   Why is it `Short`? It doesn't look like we can save a lot of memory this way



##########
modules/file-transferring/src/main/java/org/apache/ignite/internal/network/file/FileTransferringService.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import java.nio.file.Path;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.network.file.messages.TransferMetadata;
+
+/**
+ * File transferring service.
+ */
+public interface FileTransferringService extends IgniteComponent {
+    /**
+     * Adds a file provider for the given metadata.
+     *
+     * @param metadata Metadata.
+     * @param provider Provider.
+     */
+    <M extends TransferMetadata> void addFileProvider(
+            Class<M> metadata,
+            FileProvider<M> provider
+    );
+
+    /**
+     * Adds a file handler for the given metadata.
+     *
+     * @param metadata Metadata.
+     * @param handler Handler.
+     */
+    <M extends TransferMetadata> void addFileHandler(Class<M> metadata, 
FileHandler<M> handler);
+
+    /**
+     * Downloads files for the given metadata.
+     *
+     * @param node Node.
+     * @param transferMetadata Metadata.
+     * @return Temporary path to the downloaded files. The caller is 
responsible for deleting the files.
+     */
+    CompletableFuture<Path> download(String node, TransferMetadata 
transferMetadata);
+
+    /**
+     * Uploads files for the given metadata.
+     *
+     * @param node Node.

Review Comment:
   Let's say explicitly that it's a consistent ID of a node



##########
modules/file-transferring/src/main/java/org/apache/ignite/internal/network/file/FileSender.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.function.BiFunction;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.network.NetworkMessage;
+
+class FileSender implements ManuallyCloseable {
+    private static final IgniteLogger LOG = Loggers.forClass(FileSender.class);
+    private final int chunkSize;
+    private final RateLimiter rateLimiter;
+    private final BiFunction<String, NetworkMessage, CompletableFuture<Void>> 
send;
+
+    private final Queue<FileTransferringRequest> filesToSend = new 
ConcurrentLinkedQueue<>();
+
+    private final ExecutorService executorService = newSingleThreadExecutor(
+            new NamedThreadFactory("FileSenderExecutor", LOG)

Review Comment:
   Thread prefix must contain node name for easier identification



##########
modules/file-transferring/src/main/java/org/apache/ignite/internal/network/file/FileTransferringServiceImpl.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.file.messages.ChunkedFile;
+import org.apache.ignite.internal.network.file.messages.FileDownloadRequest;
+import 
org.apache.ignite.internal.network.file.messages.FileDownloadRequestImpl;
+import org.apache.ignite.internal.network.file.messages.FileDownloadResponse;
+import 
org.apache.ignite.internal.network.file.messages.FileDownloadResponseImpl;
+import org.apache.ignite.internal.network.file.messages.FileHeader;
+import org.apache.ignite.internal.network.file.messages.FileTransferInfo;
+import 
org.apache.ignite.internal.network.file.messages.FileTransferringMessageType;
+import org.apache.ignite.internal.network.file.messages.FileUploadRequest;
+import org.apache.ignite.internal.network.file.messages.FileUploadRequestImpl;
+import org.apache.ignite.internal.network.file.messages.TransferMetadata;
+import org.apache.ignite.internal.util.FilesUtils;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ChannelType;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * Implementation of {@link FileTransferringService}.
+ */
+public class FileTransferringServiceImpl implements FileTransferringService {
+    private static final IgniteLogger LOG = 
Loggers.forClass(FileTransferringServiceImpl.class);
+
+    private static final int CHUNK_SIZE = 1024 * 1024;
+    private static final int CONCURRENT_REQUESTS = 10;
+    private static final ChannelType FILE_TRANSFERRING_CHANNEL = 
ChannelType.register((short) 1, "FileTransferring");
+
+    /**
+     * Cluster service.
+     */
+    private final MessagingService messagingService;
+
+    private final Path tempDirectory;
+
+    private final FileSender fileSender;
+
+    private final Map<UUID, FileReceiver> transferIdToReceiver = new 
ConcurrentHashMap<>();
+    private final Map<Short, FileProvider<TransferMetadata>> 
metadataToProvider = new ConcurrentHashMap<>();
+    private final Map<Short, FileHandler<TransferMetadata>> metadataToHandler 
= new ConcurrentHashMap<>();
+
+    /**
+     * Constructor.
+     *
+     * @param messagingService Messaging service.
+     * @param tempDirectory Temporary directory.
+     */
+    public FileTransferringServiceImpl(MessagingService messagingService, Path 
tempDirectory) {
+        this.messagingService = messagingService;
+        this.tempDirectory = tempDirectory;
+        this.fileSender = new FileSender(CHUNK_SIZE, new 
RateLimiter(CONCURRENT_REQUESTS), (recipientConsistentId, message) -> {
+            return messagingService.send(recipientConsistentId, 
FILE_TRANSFERRING_CHANNEL, message);
+        });
+    }
+
+    @Override
+    public void start() {
+        messagingService.addMessageHandler(FileTransferringMessageType.class,
+                (message, senderConsistentId, correlationId) -> {
+                    if (message instanceof FileDownloadRequest) {
+                        processDownloadRequest((FileDownloadRequest) message, 
senderConsistentId, correlationId);
+                    } else if (message instanceof FileUploadRequest) {
+                        processUploadRequest((FileUploadRequest) message);
+                    } else if (message instanceof FileTransferInfo) {
+                        processFileTransferInfo((FileTransferInfo) message);
+                    } else if (message instanceof FileHeader) {
+                        processFileHeader((FileHeader) message);
+                    } else if (message instanceof ChunkedFile) {
+                        processChunkedFile((ChunkedFile) message);
+                    }

Review Comment:
   Le's add a catch-all to make sure we see if anything is missed



##########
modules/file-transferring/src/main/java/org/apache/ignite/internal/network/file/FileTransferringMessagesStream.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.network.file.messages.ChunkedFile;
+import org.apache.ignite.internal.network.file.messages.ChunkedFileImpl;
+import org.apache.ignite.internal.network.file.messages.FileHeader;
+import org.apache.ignite.internal.network.file.messages.FileHeaderImpl;
+import org.apache.ignite.internal.network.file.messages.FileTransferInfo;
+import org.apache.ignite.internal.network.file.messages.FileTransferInfoImpl;
+import org.apache.ignite.network.NetworkMessage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Stream of messages to send files.
+ */
+public class FileTransferringMessagesStream implements ManuallyCloseable {
+    private final UUID transferId;
+
+    private final Queue<File> filesToSend;
+
+    private final int chunkSize;
+
+    @Nullable
+    private ChunkedFileReader currFile;
+
+    private final AtomicReference<FileTransferInfo> fileTransferInfo = new 
AtomicReference<>();

Review Comment:
   Why is atomic used here? This class seems to be used in a single thread (at 
least, the queue is not thread-safe)



##########
modules/file-transferring/src/main/java/org/apache/ignite/internal/network/file/FileSender.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.function.BiFunction;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.network.NetworkMessage;
+
+class FileSender implements ManuallyCloseable {
+    private static final IgniteLogger LOG = Loggers.forClass(FileSender.class);
+    private final int chunkSize;
+    private final RateLimiter rateLimiter;
+    private final BiFunction<String, NetworkMessage, CompletableFuture<Void>> 
send;
+
+    private final Queue<FileTransferringRequest> filesToSend = new 
ConcurrentLinkedQueue<>();
+
+    private final ExecutorService executorService = newSingleThreadExecutor(
+            new NamedThreadFactory("FileSenderExecutor", LOG)
+    );
+
+    FileSender(
+            int chunkSize,
+            RateLimiter rateLimiter,
+            BiFunction<String, NetworkMessage, CompletableFuture<Void>> send
+    ) {
+        this.send = send;
+        this.chunkSize = chunkSize;
+        this.rateLimiter = rateLimiter;
+    }
+
+    /**
+     * Adds files to the queue to be sent to the receiver.
+     */
+    CompletableFuture<Void> send(String receiverConsistentId, UUID id, 
List<File> files) {
+        FileTransferringRequest request = new 
FileTransferringRequest(receiverConsistentId, id, files, chunkSize);
+        filesToSend.add(request);
+        executorService.submit(this::processQueue);
+        return request.result();
+    }
+
+    /**
+     * Processes the queue of files to be sent.
+     */
+    private void processQueue() {
+        while (!filesToSend.isEmpty()) {
+            FileTransferringRequest request = filesToSend.peek();
+            try {
+                FileTransferringMessagesStream stream = request.messagesStream;
+                while (stream.hasNextMessage() && rateLimiter.tryAcquire()) {
+                    send.apply(request.receiverConsistentId, 
stream.nextMessage())
+                            .whenComplete((res, e) -> {
+                                if (e != null) {
+                                    request.complete(e);
+                                }
+                                rateLimiter.release();
+                            });
+                }
+                if (!stream.hasNextMessage()) {
+                    request.complete();
+                    filesToSend.remove();
+                }
+            } catch (Exception e) {
+                request.complete(e);
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        executorService.shutdown();
+        filesToSend.forEach(it -> it.complete(new InterruptedException("File 
sender is closed")));

Review Comment:
   `InterruptedException` has a very specific meaning and does not seem 
appropriate here. `NodeStoppingException` (if it's about node shutdown time) or 
maybe just `IgniteInternalException` seems better



##########
modules/file-transferring/src/main/java/org/apache/ignite/internal/network/file/FileReceiver.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.network.file.messages.ChunkedFile;
+import org.apache.ignite.internal.network.file.messages.FileHeader;
+import org.apache.ignite.internal.network.file.messages.FileTransferInfo;
+import org.apache.ignite.internal.util.IgniteUtils;
+
+class FileReceiver implements ManuallyCloseable {
+    private final Path dir;
+    private final AtomicInteger filesCount = new AtomicInteger(-1);
+    private final AtomicInteger filesFinished = new AtomicInteger(0);
+    private final CompletableFuture<Path> result = new CompletableFuture<>();
+    private final Map<String, ChunkedFileWriter> fileNameToWriter = new 
ConcurrentHashMap<>();
+    private final Map<String, Lock> fileNameToLock = new ConcurrentHashMap<>();
+
+    FileReceiver(Path dir) {
+        this.dir = dir;
+    }
+
+    void receive(FileTransferInfo info) {

Review Comment:
   There are three overloads of `receive()` method that do quite different 
things: they receive a 'whole transfer header' (once per transfer), a 'file 
header' (once per file, N times per transfer) and a 'file chunk' (M times per 
file). It seems that it would be better to name them differently (like 
`receiveTransferInfo()`, `receiveFileHeader()`, `receiveFileChunk()` to avoid 
confusion.



##########
modules/file-transferring/src/main/java/org/apache/ignite/internal/network/file/FileTransferringMessagesStream.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.network.file.messages.ChunkedFile;
+import org.apache.ignite.internal.network.file.messages.ChunkedFileImpl;
+import org.apache.ignite.internal.network.file.messages.FileHeader;
+import org.apache.ignite.internal.network.file.messages.FileHeaderImpl;
+import org.apache.ignite.internal.network.file.messages.FileTransferInfo;
+import org.apache.ignite.internal.network.file.messages.FileTransferInfoImpl;
+import org.apache.ignite.network.NetworkMessage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Stream of messages to send files.
+ */
+public class FileTransferringMessagesStream implements ManuallyCloseable {
+    private final UUID transferId;
+
+    private final Queue<File> filesToSend;
+
+    private final int chunkSize;
+
+    @Nullable
+    private ChunkedFileReader currFile;
+
+    private final AtomicReference<FileTransferInfo> fileTransferInfo = new 
AtomicReference<>();
+
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+
+    /**
+     * Creates a new stream of messages to send files.
+     *
+     * @param transferId the id of the stream.
+     * @param filesToSend the files to send. Must not be empty.
+     * @param chunkSize the size of the chunks to send. Must be positive.
+     */
+    FileTransferringMessagesStream(
+            UUID transferId,
+            List<File> filesToSend,
+            int chunkSize
+    ) {
+        if (filesToSend.isEmpty()) {
+            throw new IllegalArgumentException("Files to send cannot be 
empty.");
+        }
+
+        if (chunkSize <= 0) {
+            throw new IllegalArgumentException("Chunk size must be positive.");
+        }
+
+        this.transferId = transferId;
+        this.filesToSend = new LinkedList<>(filesToSend);
+        this.chunkSize = chunkSize;
+        this.fileTransferInfo.set(fileTransferInfo());
+    }
+
+    /**
+     * Returns true if there are more messages to send.
+     *
+     * @return true if there are more messages to send.
+     */
+    boolean hasNextMessage() throws IOException {
+        // check that the stream is not closed.
+        if (closed.get()) {
+            return false;
+        } else {
+            // check that there are more messages to send.
+            // 1. there is a file transfer info message to send.
+            // 2. there are files to send.
+            // 3. there is a current file to send.
+            return fileTransferInfo.get() != null || !filesToSend.isEmpty() || 
(currFile != null && !currFile.isFinished());
+        }
+    }
+
+    /**
+     * Returns the next message to send.
+     *
+     * @return the next message to send.
+     * @throws IOException if an I/O error occurs.
+     * @throws IllegalStateException if there are no more messages to send.
+     */
+    NetworkMessage nextMessage() throws IOException {
+        if (!hasNextMessage()) {
+            throw new IllegalStateException("There are no more messages to 
send.");
+        }
+
+        FileTransferInfo info = fileTransferInfo.getAndSet(null);
+        if (info != null) {
+            return info;
+        } else {
+            if (currFile == null || currFile.isFinished()) {
+                openNextFile();
+                return header();
+            } else {
+                return nextChunk();
+            }
+        }
+    }
+
+    private FileTransferInfo fileTransferInfo() {
+        return FileTransferInfoImpl.builder()
+                .transferId(transferId)
+                .filesCount(filesToSend.size())
+                .build();
+    }
+
+    /**
+     * Returns the header of the current file to send.
+     */
+    private FileHeader header() throws IOException {
+        assert currFile != null : "Current file is null.";
+
+        return FileHeaderImpl.builder()
+                .transferId(transferId)
+                .fileName(currFile.fileName())
+                .fileSize(currFile.length())
+                .build();
+    }
+
+    /**
+     * Returns the next chunk of the current file. Throws an exception if the 
current file is finished.
+     *
+     * @return the next chunk of the current file.
+     * @throws IOException if an I/O error occurs.
+     * @throws IllegalStateException if the current file is finished.
+     */
+    private ChunkedFile nextChunk() throws IOException {
+        assert currFile != null : "Current file is null.";
+        assert !currFile.isFinished() : "Current file is finished.";
+
+        return ChunkedFileImpl.builder()

Review Comment:
   It looks like it's not a `ChunkedFile`, but a `FileChunk`



##########
modules/file-transferring/src/main/java/org/apache/ignite/internal/network/file/FileReceiver.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.network.file.messages.ChunkedFile;
+import org.apache.ignite.internal.network.file.messages.FileHeader;
+import org.apache.ignite.internal.network.file.messages.FileTransferInfo;
+import org.apache.ignite.internal.util.IgniteUtils;
+
+class FileReceiver implements ManuallyCloseable {
+    private final Path dir;
+    private final AtomicInteger filesCount = new AtomicInteger(-1);
+    private final AtomicInteger filesFinished = new AtomicInteger(0);
+    private final CompletableFuture<Path> result = new CompletableFuture<>();
+    private final Map<String, ChunkedFileWriter> fileNameToWriter = new 
ConcurrentHashMap<>();
+    private final Map<String, Lock> fileNameToLock = new ConcurrentHashMap<>();
+
+    FileReceiver(Path dir) {
+        this.dir = dir;
+    }
+
+    void receive(FileTransferInfo info) {
+        if (result.isDone()) {
+            throw new IllegalStateException("Received file transfer info after 
result is already done.");
+        }
+        filesCount.set(info.filesCount());
+    }
+
+    void receive(FileHeader header) {
+        if (result.isDone()) {
+            throw new IllegalStateException("Received file header after result 
is already done.");
+        }
+        doInLock(header.fileName(), () -> receive0(header));
+    }
+
+    void receive(ChunkedFile chunkedFile) {
+        if (result.isDone()) {
+            throw new IllegalStateException("Received chunked file after 
result is already done.");
+        }
+        doInLock(chunkedFile.fileName(), () -> receive0(chunkedFile));
+    }
+
+    private void receive0(FileHeader header) {
+        try {
+            Path path = Files.createFile(dir.resolve(header.fileName()));
+            fileNameToWriter.put(header.fileName(), new 
ChunkedFileWriter(path, header.fileSize()));
+        } catch (IOException e) {
+            result.completeExceptionally(e);
+        }
+    }
+
+    private void receive0(ChunkedFile chunkedFile) {
+        try {
+            ChunkedFileWriter writer = 
fileNameToWriter.get(chunkedFile.fileName());
+            writer.write(chunkedFile);
+
+            if (writer.isFinished()) {
+                writer.close();
+                fileNameToWriter.remove(chunkedFile.fileName());
+                filesFinished.incrementAndGet();
+            }
+
+            if (filesFinished.get() == filesCount.get()) {
+                result.complete(dir);
+            }
+
+        } catch (IOException e) {
+            result.completeExceptionally(e);

Review Comment:
   Should the writers be closed here as well?



##########
modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java:
##########
@@ -76,6 +76,26 @@ default CompletableFuture<Void> send(ClusterNode recipient, 
NetworkMessage msg)
      */
     CompletableFuture<Void> send(ClusterNode recipient, ChannelType 
channelType, NetworkMessage msg);
 
+    /**
+     * Tries to send the given message via specified channel asynchronously to 
the specific cluster member.
+     *
+     * <p>Guarantees:

Review Comment:
   Does `ChannelType` affect these guarantees? I mean, if m1 was sent using an 
overload without the `ChannelType`, but m2 was sent using an overload with a 
`ChannelType`, will the guarantees still hold?



##########
modules/file-transferring/src/main/java/org/apache/ignite/internal/network/file/FileTransferringService.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import java.nio.file.Path;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.network.file.messages.TransferMetadata;
+
+/**
+ * File transferring service.
+ */
+public interface FileTransferringService extends IgniteComponent {
+    /**
+     * Adds a file provider for the given metadata.
+     *
+     * @param metadata Metadata.
+     * @param provider Provider.
+     */
+    <M extends TransferMetadata> void addFileProvider(
+            Class<M> metadata,
+            FileProvider<M> provider
+    );
+
+    /**
+     * Adds a file handler for the given metadata.
+     *
+     * @param metadata Metadata.
+     * @param handler Handler.
+     */
+    <M extends TransferMetadata> void addFileHandler(Class<M> metadata, 
FileHandler<M> handler);
+
+    /**
+     * Downloads files for the given metadata.
+     *
+     * @param node Node.

Review Comment:
   Let's say explicitly that it's a consistent ID of a node



##########
modules/file-transferring/build.gradle:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply from: "$rootDir/buildscripts/java-core.gradle"
+apply from: "$rootDir/buildscripts/publishing.gradle"
+apply from: "$rootDir/buildscripts/java-junit5.gradle"
+apply from: "$rootDir/buildscripts/java-integration-test.gradle"
+apply from: "$rootDir/buildscripts/java-test-fixtures.gradle"
+

Review Comment:
   Would `file-transfer` be a better name for the module?



##########
modules/file-transferring/src/integrationTest/java/org/apache/ignite/internal/network/file/ItFileTransferringTest.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.network.file.FileAssertions.assertContentEquals;
+import static org.apache.ignite.internal.network.file.FileGenerator.randomFile;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.network.file.TestCluster.Node;
+import org.apache.ignite.internal.network.file.messages.TransferMetadata;
+import org.apache.ignite.internal.network.file.messages.TransferMetadataImpl;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Integration tests for file transferring.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class ItFileTransferringTest {
+
+    @WorkDirectory
+    private Path workDir;
+
+    private TestCluster cluster;
+
+    @BeforeEach
+    void setUp(TestInfo testInfo) throws InterruptedException {
+        cluster = new TestCluster(2, workDir, testInfo);
+        cluster.startAwait();
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        cluster.shutdown();
+    }
+
+    @Test
+    void download() throws IOException {
+        Node node0 = cluster.members.get(0);
+
+        String unit = "unit";
+        String version = "1.0.0";
+        Path unitPath = node0.workDir().resolve(unit).resolve(version);
+        Files.createDirectories(unitPath);
+
+        File file1 = randomFile(unitPath, 1024 * 1024 * 10).toFile();
+        File file2 = randomFile(unitPath, 1024 * 1024 * 10 + 1).toFile();
+        File file3 = randomFile(unitPath, 1024 * 1024 * 10 - 1).toFile();
+
+        node0.fileTransferringService().addFileProvider(
+                TransferMetadata.class,
+                req -> completedFuture(List.of(file1, file2, file3))
+        );
+
+        Node node1 = cluster.members.get(1);
+        CompletableFuture<Path> download = 
node1.fileTransferringService().download(
+                node0.nodeName(),
+                TransferMetadataImpl.builder().build()
+        );
+        assertThat(download.thenAccept(path -> assertContentEquals(unitPath, 
path)), CompletableFutureMatcher.willCompleteSuccessfully());
+    }
+
+    @Test
+    void upload() throws IOException {
+        Node node0 = cluster.members.get(0);
+
+        String unit = "unit";
+        String version = "1.0.0";
+        Path unitPath = node0.workDir().resolve(unit).resolve(version);
+        Files.createDirectories(unitPath);
+
+        File file1 = randomFile(unitPath, 1024 * 1024 * 10).toFile();
+        File file2 = randomFile(unitPath, 1024 * 1024 * 10 + 1).toFile();
+        File file3 = randomFile(unitPath, 1024 * 1024 * 10 - 1).toFile();
+
+        node0.fileTransferringService().addFileProvider(
+                TransferMetadata.class,
+                req -> completedFuture(List.of(file1, file2, file3))
+        );
+
+        Node node1 = cluster.members.get(1);
+
+        CompletableFuture<Path> future = new CompletableFuture<>();
+        node1.fileTransferringService().addFileHandler(TransferMetadata.class, 
((metadata, uploadedFile) -> {
+            future.complete(uploadedFile);

Review Comment:
   It looks like the future is completed with a directory path, not with a file 
path. It would be nice if the naming reflected this



##########
modules/file-transferring/src/main/java/org/apache/ignite/internal/network/file/FileTransferringServiceImpl.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.file.messages.ChunkedFile;
+import org.apache.ignite.internal.network.file.messages.FileDownloadRequest;
+import 
org.apache.ignite.internal.network.file.messages.FileDownloadRequestImpl;
+import org.apache.ignite.internal.network.file.messages.FileDownloadResponse;
+import 
org.apache.ignite.internal.network.file.messages.FileDownloadResponseImpl;
+import org.apache.ignite.internal.network.file.messages.FileHeader;
+import org.apache.ignite.internal.network.file.messages.FileTransferInfo;
+import 
org.apache.ignite.internal.network.file.messages.FileTransferringMessageType;
+import org.apache.ignite.internal.network.file.messages.FileUploadRequest;
+import org.apache.ignite.internal.network.file.messages.FileUploadRequestImpl;
+import org.apache.ignite.internal.network.file.messages.TransferMetadata;
+import org.apache.ignite.internal.util.FilesUtils;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ChannelType;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * Implementation of {@link FileTransferringService}.
+ */
+public class FileTransferringServiceImpl implements FileTransferringService {
+    private static final IgniteLogger LOG = 
Loggers.forClass(FileTransferringServiceImpl.class);
+
+    private static final int CHUNK_SIZE = 1024 * 1024;
+    private static final int CONCURRENT_REQUESTS = 10;
+    private static final ChannelType FILE_TRANSFERRING_CHANNEL = 
ChannelType.register((short) 1, "FileTransferring");
+
+    /**
+     * Cluster service.
+     */
+    private final MessagingService messagingService;
+
+    private final Path tempDirectory;
+
+    private final FileSender fileSender;
+
+    private final Map<UUID, FileReceiver> transferIdToReceiver = new 
ConcurrentHashMap<>();
+    private final Map<Short, FileProvider<TransferMetadata>> 
metadataToProvider = new ConcurrentHashMap<>();
+    private final Map<Short, FileHandler<TransferMetadata>> metadataToHandler 
= new ConcurrentHashMap<>();
+
+    /**
+     * Constructor.
+     *
+     * @param messagingService Messaging service.
+     * @param tempDirectory Temporary directory.
+     */
+    public FileTransferringServiceImpl(MessagingService messagingService, Path 
tempDirectory) {
+        this.messagingService = messagingService;
+        this.tempDirectory = tempDirectory;
+        this.fileSender = new FileSender(CHUNK_SIZE, new 
RateLimiter(CONCURRENT_REQUESTS), (recipientConsistentId, message) -> {
+            return messagingService.send(recipientConsistentId, 
FILE_TRANSFERRING_CHANNEL, message);
+        });
+    }
+
+    @Override
+    public void start() {
+        messagingService.addMessageHandler(FileTransferringMessageType.class,
+                (message, senderConsistentId, correlationId) -> {
+                    if (message instanceof FileDownloadRequest) {
+                        processDownloadRequest((FileDownloadRequest) message, 
senderConsistentId, correlationId);
+                    } else if (message instanceof FileUploadRequest) {
+                        processUploadRequest((FileUploadRequest) message);
+                    } else if (message instanceof FileTransferInfo) {
+                        processFileTransferInfo((FileTransferInfo) message);
+                    } else if (message instanceof FileHeader) {
+                        processFileHeader((FileHeader) message);
+                    } else if (message instanceof ChunkedFile) {
+                        processChunkedFile((ChunkedFile) message);
+                    }
+                });
+    }
+
+    private void processUploadRequest(FileUploadRequest message) {
+        createFileReceiver(message.transferId())
+                .thenCompose(receiver -> 
handleFileTransferring(message.transferId(), receiver))
+                .thenCompose(path -> {
+                    return 
metadataToHandler.get(message.metadata().messageType()).handleUpload(message.metadata(),
 path)
+                            .whenComplete((v, e) -> {
+                                if (e != null) {
+                                    LOG.error("Failed to handle file upload. 
Metadata: {}", message.metadata(), e);
+                                }
+
+                                deleteDirectoryIfExists(path);
+                            });
+                });
+    }
+
+    private void processDownloadRequest(FileDownloadRequest message, String 
senderConsistentId, Long correlationId) {
+        
metadataToProvider.get(message.metadata().messageType()).files(message.metadata())
+                .handle((files, e) -> {
+                    if (e != null) {
+                        LOG.error("Failed to get files for download. Metadata: 
{}", message.metadata(), e);
+                        // todo send error response
+                        return completedFuture(null);
+                    }
+
+                    if (files.isEmpty()) {
+                        LOG.error("No files found for download. Metadata: {}", 
message.metadata());
+                        // todo send error response
+                        return completedFuture(null);
+                    }
+
+                    FileDownloadResponse response = 
FileDownloadResponseImpl.builder()
+                            .build();
+
+                    return messagingService.respond(senderConsistentId, 
FILE_TRANSFERRING_CHANNEL, response, correlationId)
+                            .thenCompose(v -> send(senderConsistentId, 
message.transferId(), files));
+                })
+                .thenCompose(it -> it);
+    }
+
+    private CompletableFuture<Void> send(String recipientConsistentId, UUID 
transferId, List<File> files) {
+        return fileSender.send(recipientConsistentId, transferId, files)
+                .whenComplete((v, e) -> {
+                    if (e != null) {
+                        LOG.error("Failed to send files to node: {}. 
Exception: {}", recipientConsistentId, e);
+                        // todo send error response
+                    }
+                });
+    }
+
+    private void processFileTransferInfo(FileTransferInfo info) {
+        FileReceiver receiver = transferIdToReceiver.get(info.transferId());
+        if (receiver == null) {
+            LOG.warn("File receiver is not found for file transfer info: {}", 
info);
+        } else {
+            receiver.receive(info);
+        }
+    }
+
+    private void processFileHeader(FileHeader header) {
+        FileReceiver receiver = transferIdToReceiver.get(header.transferId());
+        if (receiver == null) {
+            LOG.warn("File receiver is not found for file header: {}", header);
+        } else {
+            receiver.receive(header);
+        }
+    }
+
+    private void processChunkedFile(ChunkedFile chunkedFile) {
+        FileReceiver receiver = 
transferIdToReceiver.get(chunkedFile.transferId());
+        if (receiver == null) {
+            LOG.warn("File receiver is not found for chunked file: {}", 
chunkedFile);
+        } else {
+            receiver.receive(chunkedFile);
+        }
+    }
+
+    private CompletableFuture<FileReceiver> createFileReceiver(UUID 
transferId) {
+        try {

Review Comment:
   The method is completely synchronous, why does it return a future?



##########
modules/file-transferring/src/main/java/org/apache/ignite/internal/network/file/FileSender.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.function.BiFunction;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.network.NetworkMessage;
+
+class FileSender implements ManuallyCloseable {
+    private static final IgniteLogger LOG = Loggers.forClass(FileSender.class);
+    private final int chunkSize;
+    private final RateLimiter rateLimiter;
+    private final BiFunction<String, NetworkMessage, CompletableFuture<Void>> 
send;
+
+    private final Queue<FileTransferringRequest> filesToSend = new 
ConcurrentLinkedQueue<>();
+
+    private final ExecutorService executorService = newSingleThreadExecutor(
+            new NamedThreadFactory("FileSenderExecutor", LOG)
+    );
+
+    FileSender(
+            int chunkSize,
+            RateLimiter rateLimiter,
+            BiFunction<String, NetworkMessage, CompletableFuture<Void>> send
+    ) {
+        this.send = send;
+        this.chunkSize = chunkSize;
+        this.rateLimiter = rateLimiter;
+    }
+
+    /**
+     * Adds files to the queue to be sent to the receiver.
+     */
+    CompletableFuture<Void> send(String receiverConsistentId, UUID id, 
List<File> files) {
+        FileTransferringRequest request = new 
FileTransferringRequest(receiverConsistentId, id, files, chunkSize);
+        filesToSend.add(request);
+        executorService.submit(this::processQueue);
+        return request.result();
+    }
+
+    /**
+     * Processes the queue of files to be sent.
+     */
+    private void processQueue() {
+        while (!filesToSend.isEmpty()) {
+            FileTransferringRequest request = filesToSend.peek();
+            try {
+                FileTransferringMessagesStream stream = request.messagesStream;
+                while (stream.hasNextMessage() && rateLimiter.tryAcquire()) {
+                    send.apply(request.receiverConsistentId, 
stream.nextMessage())
+                            .whenComplete((res, e) -> {
+                                if (e != null) {
+                                    request.complete(e);
+                                }
+                                rateLimiter.release();
+                            });
+                }
+                if (!stream.hasNextMessage()) {
+                    request.complete();
+                    filesToSend.remove();
+                }
+            } catch (Exception e) {
+                request.complete(e);
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        executorService.shutdown();

Review Comment:
   Please use `IgniteUtils#shutdownAndAwaitTermination()`



##########
modules/file-transferring/src/main/java/org/apache/ignite/internal/network/file/FileTransferringServiceImpl.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.file.messages.ChunkedFile;
+import org.apache.ignite.internal.network.file.messages.FileDownloadRequest;
+import 
org.apache.ignite.internal.network.file.messages.FileDownloadRequestImpl;
+import org.apache.ignite.internal.network.file.messages.FileDownloadResponse;
+import 
org.apache.ignite.internal.network.file.messages.FileDownloadResponseImpl;
+import org.apache.ignite.internal.network.file.messages.FileHeader;
+import org.apache.ignite.internal.network.file.messages.FileTransferInfo;
+import 
org.apache.ignite.internal.network.file.messages.FileTransferringMessageType;
+import org.apache.ignite.internal.network.file.messages.FileUploadRequest;
+import org.apache.ignite.internal.network.file.messages.FileUploadRequestImpl;
+import org.apache.ignite.internal.network.file.messages.TransferMetadata;
+import org.apache.ignite.internal.util.FilesUtils;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ChannelType;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * Implementation of {@link FileTransferringService}.
+ */
+public class FileTransferringServiceImpl implements FileTransferringService {
+    private static final IgniteLogger LOG = 
Loggers.forClass(FileTransferringServiceImpl.class);
+
+    private static final int CHUNK_SIZE = 1024 * 1024;
+    private static final int CONCURRENT_REQUESTS = 10;
+    private static final ChannelType FILE_TRANSFERRING_CHANNEL = 
ChannelType.register((short) 1, "FileTransferring");
+
+    /**
+     * Cluster service.
+     */
+    private final MessagingService messagingService;
+
+    private final Path tempDirectory;
+
+    private final FileSender fileSender;
+
+    private final Map<UUID, FileReceiver> transferIdToReceiver = new 
ConcurrentHashMap<>();
+    private final Map<Short, FileProvider<TransferMetadata>> 
metadataToProvider = new ConcurrentHashMap<>();
+    private final Map<Short, FileHandler<TransferMetadata>> metadataToHandler 
= new ConcurrentHashMap<>();
+
+    /**
+     * Constructor.
+     *
+     * @param messagingService Messaging service.
+     * @param tempDirectory Temporary directory.
+     */
+    public FileTransferringServiceImpl(MessagingService messagingService, Path 
tempDirectory) {
+        this.messagingService = messagingService;
+        this.tempDirectory = tempDirectory;
+        this.fileSender = new FileSender(CHUNK_SIZE, new 
RateLimiter(CONCURRENT_REQUESTS), (recipientConsistentId, message) -> {
+            return messagingService.send(recipientConsistentId, 
FILE_TRANSFERRING_CHANNEL, message);
+        });
+    }
+
+    @Override
+    public void start() {
+        messagingService.addMessageHandler(FileTransferringMessageType.class,
+                (message, senderConsistentId, correlationId) -> {
+                    if (message instanceof FileDownloadRequest) {
+                        processDownloadRequest((FileDownloadRequest) message, 
senderConsistentId, correlationId);
+                    } else if (message instanceof FileUploadRequest) {
+                        processUploadRequest((FileUploadRequest) message);
+                    } else if (message instanceof FileTransferInfo) {
+                        processFileTransferInfo((FileTransferInfo) message);
+                    } else if (message instanceof FileHeader) {
+                        processFileHeader((FileHeader) message);
+                    } else if (message instanceof ChunkedFile) {
+                        processChunkedFile((ChunkedFile) message);
+                    }
+                });
+    }
+
+    private void processUploadRequest(FileUploadRequest message) {
+        createFileReceiver(message.transferId())
+                .thenCompose(receiver -> 
handleFileTransferring(message.transferId(), receiver))
+                .thenCompose(path -> {
+                    return 
metadataToHandler.get(message.metadata().messageType()).handleUpload(message.metadata(),
 path)
+                            .whenComplete((v, e) -> {
+                                if (e != null) {
+                                    LOG.error("Failed to handle file upload. 
Metadata: {}", message.metadata(), e);
+                                }
+
+                                deleteDirectoryIfExists(path);
+                            });
+                });
+    }
+
+    private void processDownloadRequest(FileDownloadRequest message, String 
senderConsistentId, Long correlationId) {
+        
metadataToProvider.get(message.metadata().messageType()).files(message.metadata())
+                .handle((files, e) -> {
+                    if (e != null) {
+                        LOG.error("Failed to get files for download. Metadata: 
{}", message.metadata(), e);
+                        // todo send error response
+                        return completedFuture(null);
+                    }
+
+                    if (files.isEmpty()) {
+                        LOG.error("No files found for download. Metadata: {}", 
message.metadata());
+                        // todo send error response
+                        return completedFuture(null);
+                    }
+
+                    FileDownloadResponse response = 
FileDownloadResponseImpl.builder()
+                            .build();
+
+                    return messagingService.respond(senderConsistentId, 
FILE_TRANSFERRING_CHANNEL, response, correlationId)
+                            .thenCompose(v -> send(senderConsistentId, 
message.transferId(), files));
+                })
+                .thenCompose(it -> it);
+    }
+
+    private CompletableFuture<Void> send(String recipientConsistentId, UUID 
transferId, List<File> files) {
+        return fileSender.send(recipientConsistentId, transferId, files)
+                .whenComplete((v, e) -> {
+                    if (e != null) {
+                        LOG.error("Failed to send files to node: {}. 
Exception: {}", recipientConsistentId, e);
+                        // todo send error response
+                    }
+                });
+    }
+
+    private void processFileTransferInfo(FileTransferInfo info) {
+        FileReceiver receiver = transferIdToReceiver.get(info.transferId());
+        if (receiver == null) {
+            LOG.warn("File receiver is not found for file transfer info: {}", 
info);
+        } else {
+            receiver.receive(info);
+        }
+    }
+
+    private void processFileHeader(FileHeader header) {
+        FileReceiver receiver = transferIdToReceiver.get(header.transferId());
+        if (receiver == null) {
+            LOG.warn("File receiver is not found for file header: {}", header);
+        } else {
+            receiver.receive(header);
+        }
+    }
+
+    private void processChunkedFile(ChunkedFile chunkedFile) {
+        FileReceiver receiver = 
transferIdToReceiver.get(chunkedFile.transferId());
+        if (receiver == null) {
+            LOG.warn("File receiver is not found for chunked file: {}", 
chunkedFile);
+        } else {
+            receiver.receive(chunkedFile);
+        }
+    }
+
+    private CompletableFuture<FileReceiver> createFileReceiver(UUID 
transferId) {
+        try {
+            Path directory = 
Files.createDirectory(tempDirectory.resolve(transferId.toString()));
+            FileReceiver receiver = new FileReceiver(directory);
+            transferIdToReceiver.put(transferId, receiver);
+            return completedFuture(receiver);
+        } catch (IOException e) {
+            return failedFuture(e);
+        }
+    }
+
+    private CompletableFuture<Path> handleFileTransferring(UUID transferId, 
FileReceiver fileReceiver) {
+        return fileReceiver.result()
+                .whenComplete((v, e) -> {
+                    transferIdToReceiver.remove(transferId);
+                    try {
+                        fileReceiver.close();
+                    } catch (Exception ex) {
+                        LOG.error("Failed to close file receiver: {}. 
Exception: {}", fileReceiver, ex);
+                    }
+                    if (e != null) {
+                        LOG.error("Failed to receive file. Id: {}. Exception: 
{}", transferId, e);
+                        deleteDirectoryIfExists(fileReceiver.dir());
+                    }
+                });
+    }
+
+    private static void deleteDirectoryIfExists(Path directory) {
+        try {
+            FilesUtils.deleteDirectoryIfExists(directory);
+        } catch (IOException e) {
+            LOG.error("Failed to delete directory: {}. Exception: {}", 
directory, e);
+        }
+    }
+
+    @Override
+    public void stop() throws Exception {
+        IgniteUtils.closeAllManually(Stream.concat(
+                transferIdToReceiver.values().stream(),
+                Stream.of(fileSender)
+        ));
+    }
+
+    @Override
+    public <M extends TransferMetadata> void addFileProvider(
+            Class<M> metadata,
+            FileProvider<M> provider
+    ) {
+        metadataToProvider.put(
+                metadata.getAnnotation(Transferable.class).value(),
+                (FileProvider<TransferMetadata>) provider
+        );
+    }
+
+    @Override
+    public <M extends TransferMetadata> void addFileHandler(
+            Class<M> metadata,
+            FileHandler<M> handler
+    ) {
+        metadataToHandler.put(
+                metadata.getAnnotation(Transferable.class).value(),
+                (FileHandler<TransferMetadata>) handler
+        );
+    }
+
+    @Override
+    public CompletableFuture<Path> download(String node, TransferMetadata 
transferMetadata) {
+        UUID transferId = UUID.randomUUID();
+        FileDownloadRequest message = FileDownloadRequestImpl.builder()

Review Comment:
   We usually use the corresponding factory to instantiate a builder



##########
modules/file-transferring/src/main/java/org/apache/ignite/internal/network/file/FileTransferringServiceImpl.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.file.messages.ChunkedFile;
+import org.apache.ignite.internal.network.file.messages.FileDownloadRequest;
+import 
org.apache.ignite.internal.network.file.messages.FileDownloadRequestImpl;
+import org.apache.ignite.internal.network.file.messages.FileDownloadResponse;
+import 
org.apache.ignite.internal.network.file.messages.FileDownloadResponseImpl;
+import org.apache.ignite.internal.network.file.messages.FileHeader;
+import org.apache.ignite.internal.network.file.messages.FileTransferInfo;
+import 
org.apache.ignite.internal.network.file.messages.FileTransferringMessageType;
+import org.apache.ignite.internal.network.file.messages.FileUploadRequest;
+import org.apache.ignite.internal.network.file.messages.FileUploadRequestImpl;
+import org.apache.ignite.internal.network.file.messages.TransferMetadata;
+import org.apache.ignite.internal.util.FilesUtils;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ChannelType;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * Implementation of {@link FileTransferringService}.
+ */
+public class FileTransferringServiceImpl implements FileTransferringService {
+    private static final IgniteLogger LOG = 
Loggers.forClass(FileTransferringServiceImpl.class);
+
+    private static final int CHUNK_SIZE = 1024 * 1024;
+    private static final int CONCURRENT_REQUESTS = 10;
+    private static final ChannelType FILE_TRANSFERRING_CHANNEL = 
ChannelType.register((short) 1, "FileTransferring");
+
+    /**
+     * Cluster service.
+     */
+    private final MessagingService messagingService;
+
+    private final Path tempDirectory;
+
+    private final FileSender fileSender;
+
+    private final Map<UUID, FileReceiver> transferIdToReceiver = new 
ConcurrentHashMap<>();
+    private final Map<Short, FileProvider<TransferMetadata>> 
metadataToProvider = new ConcurrentHashMap<>();
+    private final Map<Short, FileHandler<TransferMetadata>> metadataToHandler 
= new ConcurrentHashMap<>();
+
+    /**
+     * Constructor.
+     *
+     * @param messagingService Messaging service.
+     * @param tempDirectory Temporary directory.
+     */
+    public FileTransferringServiceImpl(MessagingService messagingService, Path 
tempDirectory) {
+        this.messagingService = messagingService;
+        this.tempDirectory = tempDirectory;
+        this.fileSender = new FileSender(CHUNK_SIZE, new 
RateLimiter(CONCURRENT_REQUESTS), (recipientConsistentId, message) -> {
+            return messagingService.send(recipientConsistentId, 
FILE_TRANSFERRING_CHANNEL, message);
+        });
+    }
+
+    @Override
+    public void start() {
+        messagingService.addMessageHandler(FileTransferringMessageType.class,
+                (message, senderConsistentId, correlationId) -> {
+                    if (message instanceof FileDownloadRequest) {
+                        processDownloadRequest((FileDownloadRequest) message, 
senderConsistentId, correlationId);
+                    } else if (message instanceof FileUploadRequest) {
+                        processUploadRequest((FileUploadRequest) message);
+                    } else if (message instanceof FileTransferInfo) {
+                        processFileTransferInfo((FileTransferInfo) message);
+                    } else if (message instanceof FileHeader) {
+                        processFileHeader((FileHeader) message);
+                    } else if (message instanceof ChunkedFile) {
+                        processChunkedFile((ChunkedFile) message);
+                    }
+                });
+    }
+
+    private void processUploadRequest(FileUploadRequest message) {
+        createFileReceiver(message.transferId())
+                .thenCompose(receiver -> 
handleFileTransferring(message.transferId(), receiver))
+                .thenCompose(path -> {
+                    return 
metadataToHandler.get(message.metadata().messageType()).handleUpload(message.metadata(),
 path)
+                            .whenComplete((v, e) -> {
+                                if (e != null) {
+                                    LOG.error("Failed to handle file upload. 
Metadata: {}", message.metadata(), e);
+                                }
+
+                                deleteDirectoryIfExists(path);
+                            });
+                });
+    }
+
+    private void processDownloadRequest(FileDownloadRequest message, String 
senderConsistentId, Long correlationId) {
+        
metadataToProvider.get(message.metadata().messageType()).files(message.metadata())
+                .handle((files, e) -> {
+                    if (e != null) {
+                        LOG.error("Failed to get files for download. Metadata: 
{}", message.metadata(), e);
+                        // todo send error response
+                        return completedFuture(null);
+                    }
+
+                    if (files.isEmpty()) {
+                        LOG.error("No files found for download. Metadata: {}", 
message.metadata());
+                        // todo send error response
+                        return completedFuture(null);
+                    }
+
+                    FileDownloadResponse response = 
FileDownloadResponseImpl.builder()
+                            .build();
+
+                    return messagingService.respond(senderConsistentId, 
FILE_TRANSFERRING_CHANNEL, response, correlationId)
+                            .thenCompose(v -> send(senderConsistentId, 
message.transferId(), files));
+                })
+                .thenCompose(it -> it);
+    }
+
+    private CompletableFuture<Void> send(String recipientConsistentId, UUID 
transferId, List<File> files) {
+        return fileSender.send(recipientConsistentId, transferId, files)
+                .whenComplete((v, e) -> {
+                    if (e != null) {
+                        LOG.error("Failed to send files to node: {}. 
Exception: {}", recipientConsistentId, e);
+                        // todo send error response
+                    }
+                });
+    }
+
+    private void processFileTransferInfo(FileTransferInfo info) {
+        FileReceiver receiver = transferIdToReceiver.get(info.transferId());
+        if (receiver == null) {
+            LOG.warn("File receiver is not found for file transfer info: {}", 
info);
+        } else {
+            receiver.receive(info);
+        }
+    }
+
+    private void processFileHeader(FileHeader header) {
+        FileReceiver receiver = transferIdToReceiver.get(header.transferId());
+        if (receiver == null) {
+            LOG.warn("File receiver is not found for file header: {}", header);
+        } else {
+            receiver.receive(header);
+        }
+    }
+
+    private void processChunkedFile(ChunkedFile chunkedFile) {
+        FileReceiver receiver = 
transferIdToReceiver.get(chunkedFile.transferId());
+        if (receiver == null) {
+            LOG.warn("File receiver is not found for chunked file: {}", 
chunkedFile);
+        } else {
+            receiver.receive(chunkedFile);
+        }
+    }
+
+    private CompletableFuture<FileReceiver> createFileReceiver(UUID 
transferId) {
+        try {
+            Path directory = 
Files.createDirectory(tempDirectory.resolve(transferId.toString()));
+            FileReceiver receiver = new FileReceiver(directory);
+            transferIdToReceiver.put(transferId, receiver);
+            return completedFuture(receiver);
+        } catch (IOException e) {
+            return failedFuture(e);
+        }
+    }
+
+    private CompletableFuture<Path> handleFileTransferring(UUID transferId, 
FileReceiver fileReceiver) {
+        return fileReceiver.result()
+                .whenComplete((v, e) -> {
+                    transferIdToReceiver.remove(transferId);
+                    try {
+                        fileReceiver.close();
+                    } catch (Exception ex) {
+                        LOG.error("Failed to close file receiver: {}. 
Exception: {}", fileReceiver, ex);
+                    }
+                    if (e != null) {
+                        LOG.error("Failed to receive file. Id: {}. Exception: 
{}", transferId, e);
+                        deleteDirectoryIfExists(fileReceiver.dir());
+                    }
+                });
+    }
+
+    private static void deleteDirectoryIfExists(Path directory) {
+        try {
+            FilesUtils.deleteDirectoryIfExists(directory);
+        } catch (IOException e) {
+            LOG.error("Failed to delete directory: {}. Exception: {}", 
directory, e);
+        }
+    }
+
+    @Override
+    public void stop() throws Exception {
+        IgniteUtils.closeAllManually(Stream.concat(
+                transferIdToReceiver.values().stream(),
+                Stream.of(fileSender)
+        ));
+    }
+
+    @Override
+    public <M extends TransferMetadata> void addFileProvider(
+            Class<M> metadata,
+            FileProvider<M> provider
+    ) {
+        metadataToProvider.put(
+                metadata.getAnnotation(Transferable.class).value(),
+                (FileProvider<TransferMetadata>) provider
+        );
+    }
+
+    @Override
+    public <M extends TransferMetadata> void addFileHandler(
+            Class<M> metadata,
+            FileHandler<M> handler
+    ) {
+        metadataToHandler.put(
+                metadata.getAnnotation(Transferable.class).value(),
+                (FileHandler<TransferMetadata>) handler
+        );
+    }
+
+    @Override
+    public CompletableFuture<Path> download(String node, TransferMetadata 
transferMetadata) {
+        UUID transferId = UUID.randomUUID();
+        FileDownloadRequest message = FileDownloadRequestImpl.builder()
+                .transferId(transferId)
+                .metadata(transferMetadata)
+                .build();
+
+        return createFileReceiver(message.transferId())
+                .thenCompose(fileReceiver -> messagingService.invoke(node, 
FILE_TRANSFERRING_CHANNEL, message, Long.MAX_VALUE)

Review Comment:
   Why is `MAX_VALUE` used as a timeout? This operation should probably be fast



##########
modules/file-transferring/src/main/java/org/apache/ignite/internal/network/file/ChunkedFileWriter.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.file.Path;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.network.file.messages.ChunkedFile;
+
+class ChunkedFileWriter implements ManuallyCloseable {
+    private final long fileSize;
+    private final RandomAccessFile raf;
+    private final Queue<ChunkedFile> chunks = new 
PriorityQueue<>(ChunkedFile.COMPARATOR);
+
+    ChunkedFileWriter(Path path, long fileSize) throws FileNotFoundException {
+        this.raf = new RandomAccessFile(path.toFile(), "rw");

Review Comment:
   Is 'r' really needed here?



##########
modules/file-transferring/src/main/java/org/apache/ignite/internal/network/file/FileTransferringServiceImpl.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.file.messages.ChunkedFile;
+import org.apache.ignite.internal.network.file.messages.FileDownloadRequest;
+import 
org.apache.ignite.internal.network.file.messages.FileDownloadRequestImpl;
+import org.apache.ignite.internal.network.file.messages.FileDownloadResponse;
+import 
org.apache.ignite.internal.network.file.messages.FileDownloadResponseImpl;
+import org.apache.ignite.internal.network.file.messages.FileHeader;
+import org.apache.ignite.internal.network.file.messages.FileTransferInfo;
+import 
org.apache.ignite.internal.network.file.messages.FileTransferringMessageType;
+import org.apache.ignite.internal.network.file.messages.FileUploadRequest;
+import org.apache.ignite.internal.network.file.messages.FileUploadRequestImpl;
+import org.apache.ignite.internal.network.file.messages.TransferMetadata;
+import org.apache.ignite.internal.util.FilesUtils;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ChannelType;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * Implementation of {@link FileTransferringService}.
+ */
+public class FileTransferringServiceImpl implements FileTransferringService {
+    private static final IgniteLogger LOG = 
Loggers.forClass(FileTransferringServiceImpl.class);
+
+    private static final int CHUNK_SIZE = 1024 * 1024;
+    private static final int CONCURRENT_REQUESTS = 10;
+    private static final ChannelType FILE_TRANSFERRING_CHANNEL = 
ChannelType.register((short) 1, "FileTransferring");
+
+    /**
+     * Cluster service.
+     */
+    private final MessagingService messagingService;
+
+    private final Path tempDirectory;
+
+    private final FileSender fileSender;
+
+    private final Map<UUID, FileReceiver> transferIdToReceiver = new 
ConcurrentHashMap<>();
+    private final Map<Short, FileProvider<TransferMetadata>> 
metadataToProvider = new ConcurrentHashMap<>();
+    private final Map<Short, FileHandler<TransferMetadata>> metadataToHandler 
= new ConcurrentHashMap<>();
+
+    /**
+     * Constructor.
+     *
+     * @param messagingService Messaging service.
+     * @param tempDirectory Temporary directory.
+     */
+    public FileTransferringServiceImpl(MessagingService messagingService, Path 
tempDirectory) {
+        this.messagingService = messagingService;
+        this.tempDirectory = tempDirectory;
+        this.fileSender = new FileSender(CHUNK_SIZE, new 
RateLimiter(CONCURRENT_REQUESTS), (recipientConsistentId, message) -> {
+            return messagingService.send(recipientConsistentId, 
FILE_TRANSFERRING_CHANNEL, message);
+        });
+    }
+
+    @Override
+    public void start() {
+        messagingService.addMessageHandler(FileTransferringMessageType.class,
+                (message, senderConsistentId, correlationId) -> {
+                    if (message instanceof FileDownloadRequest) {
+                        processDownloadRequest((FileDownloadRequest) message, 
senderConsistentId, correlationId);
+                    } else if (message instanceof FileUploadRequest) {
+                        processUploadRequest((FileUploadRequest) message);
+                    } else if (message instanceof FileTransferInfo) {
+                        processFileTransferInfo((FileTransferInfo) message);
+                    } else if (message instanceof FileHeader) {
+                        processFileHeader((FileHeader) message);
+                    } else if (message instanceof ChunkedFile) {
+                        processChunkedFile((ChunkedFile) message);
+                    }
+                });
+    }
+
+    private void processUploadRequest(FileUploadRequest message) {
+        createFileReceiver(message.transferId())
+                .thenCompose(receiver -> 
handleFileTransferring(message.transferId(), receiver))
+                .thenCompose(path -> {
+                    return 
metadataToHandler.get(message.metadata().messageType()).handleUpload(message.metadata(),
 path)
+                            .whenComplete((v, e) -> {
+                                if (e != null) {
+                                    LOG.error("Failed to handle file upload. 
Metadata: {}", message.metadata(), e);
+                                }
+
+                                deleteDirectoryIfExists(path);
+                            });
+                });
+    }
+
+    private void processDownloadRequest(FileDownloadRequest message, String 
senderConsistentId, Long correlationId) {
+        
metadataToProvider.get(message.metadata().messageType()).files(message.metadata())
+                .handle((files, e) -> {
+                    if (e != null) {
+                        LOG.error("Failed to get files for download. Metadata: 
{}", message.metadata(), e);
+                        // todo send error response
+                        return completedFuture(null);
+                    }
+
+                    if (files.isEmpty()) {
+                        LOG.error("No files found for download. Metadata: {}", 
message.metadata());
+                        // todo send error response
+                        return completedFuture(null);
+                    }
+
+                    FileDownloadResponse response = 
FileDownloadResponseImpl.builder()
+                            .build();
+
+                    return messagingService.respond(senderConsistentId, 
FILE_TRANSFERRING_CHANNEL, response, correlationId)
+                            .thenCompose(v -> send(senderConsistentId, 
message.transferId(), files));
+                })
+                .thenCompose(it -> it);
+    }
+
+    private CompletableFuture<Void> send(String recipientConsistentId, UUID 
transferId, List<File> files) {

Review Comment:
   I suggest renaming the method to something like `sendFiles()` to make sure 
there is no confusion with `MessagingService#send()`



##########
modules/file-transferring/src/main/java/org/apache/ignite/internal/network/file/FileTransferringMessagesStream.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.network.file.messages.ChunkedFile;
+import org.apache.ignite.internal.network.file.messages.ChunkedFileImpl;
+import org.apache.ignite.internal.network.file.messages.FileHeader;
+import org.apache.ignite.internal.network.file.messages.FileHeaderImpl;
+import org.apache.ignite.internal.network.file.messages.FileTransferInfo;
+import org.apache.ignite.internal.network.file.messages.FileTransferInfoImpl;
+import org.apache.ignite.network.NetworkMessage;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Stream of messages to send files.
+ */
+public class FileTransferringMessagesStream implements ManuallyCloseable {
+    private final UUID transferId;
+
+    private final Queue<File> filesToSend;
+
+    private final int chunkSize;
+
+    @Nullable
+    private ChunkedFileReader currFile;
+
+    private final AtomicReference<FileTransferInfo> fileTransferInfo = new 
AtomicReference<>();
+
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+
+    /**
+     * Creates a new stream of messages to send files.
+     *
+     * @param transferId the id of the stream.
+     * @param filesToSend the files to send. Must not be empty.
+     * @param chunkSize the size of the chunks to send. Must be positive.
+     */
+    FileTransferringMessagesStream(
+            UUID transferId,
+            List<File> filesToSend,
+            int chunkSize
+    ) {
+        if (filesToSend.isEmpty()) {
+            throw new IllegalArgumentException("Files to send cannot be 
empty.");
+        }
+
+        if (chunkSize <= 0) {
+            throw new IllegalArgumentException("Chunk size must be positive.");
+        }
+
+        this.transferId = transferId;
+        this.filesToSend = new LinkedList<>(filesToSend);
+        this.chunkSize = chunkSize;
+        this.fileTransferInfo.set(fileTransferInfo());
+    }
+
+    /**
+     * Returns true if there are more messages to send.
+     *
+     * @return true if there are more messages to send.
+     */
+    boolean hasNextMessage() throws IOException {
+        // check that the stream is not closed.
+        if (closed.get()) {
+            return false;
+        } else {
+            // check that there are more messages to send.
+            // 1. there is a file transfer info message to send.
+            // 2. there are files to send.
+            // 3. there is a current file to send.
+            return fileTransferInfo.get() != null || !filesToSend.isEmpty() || 
(currFile != null && !currFile.isFinished());
+        }
+    }
+
+    /**
+     * Returns the next message to send.
+     *
+     * @return the next message to send.
+     * @throws IOException if an I/O error occurs.
+     * @throws IllegalStateException if there are no more messages to send.
+     */
+    NetworkMessage nextMessage() throws IOException {
+        if (!hasNextMessage()) {
+            throw new IllegalStateException("There are no more messages to 
send.");
+        }
+
+        FileTransferInfo info = fileTransferInfo.getAndSet(null);
+        if (info != null) {
+            return info;
+        } else {
+            if (currFile == null || currFile.isFinished()) {
+                openNextFile();
+                return header();
+            } else {
+                return nextChunk();
+            }
+        }
+    }
+
+    private FileTransferInfo fileTransferInfo() {
+        return FileTransferInfoImpl.builder()
+                .transferId(transferId)
+                .filesCount(filesToSend.size())
+                .build();
+    }
+
+    /**
+     * Returns the header of the current file to send.
+     */
+    private FileHeader header() throws IOException {
+        assert currFile != null : "Current file is null.";
+
+        return FileHeaderImpl.builder()
+                .transferId(transferId)
+                .fileName(currFile.fileName())
+                .fileSize(currFile.length())
+                .build();
+    }
+
+    /**
+     * Returns the next chunk of the current file. Throws an exception if the 
current file is finished.
+     *
+     * @return the next chunk of the current file.
+     * @throws IOException if an I/O error occurs.
+     * @throws IllegalStateException if the current file is finished.
+     */
+    private ChunkedFile nextChunk() throws IOException {
+        assert currFile != null : "Current file is null.";
+        assert !currFile.isFinished() : "Current file is finished.";
+
+        return ChunkedFileImpl.builder()
+                .transferId(transferId)
+                .fileName(currFile.fileName())
+                .offset(currFile.offset())
+                .data(currFile.readNextChunk())
+                .build();
+    }
+
+    private void openNextFile() throws IOException {

Review Comment:
   The method promises to just open next file, but it also closes the previous 
one. It should be renamed (how about `switchToNextFile()`)?



##########
modules/file-transferring/src/main/java/org/apache/ignite/internal/network/file/messages/ChunkedFile.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file.messages;
+
+import java.util.Comparator;
+import java.util.UUID;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * Chunked file.
+ */
+@Transferable(FileTransferringMessageType.CHUNKED_FILE)
+public interface ChunkedFile extends NetworkMessage {

Review Comment:
   I suggest renaming it to `FileChunk`



##########
modules/file-transferring/src/main/java/org/apache/ignite/internal/network/file/FileReceiver.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.network.file.messages.ChunkedFile;
+import org.apache.ignite.internal.network.file.messages.FileHeader;
+import org.apache.ignite.internal.network.file.messages.FileTransferInfo;
+import org.apache.ignite.internal.util.IgniteUtils;
+
+class FileReceiver implements ManuallyCloseable {
+    private final Path dir;
+    private final AtomicInteger filesCount = new AtomicInteger(-1);
+    private final AtomicInteger filesFinished = new AtomicInteger(0);
+    private final CompletableFuture<Path> result = new CompletableFuture<>();
+    private final Map<String, ChunkedFileWriter> fileNameToWriter = new 
ConcurrentHashMap<>();
+    private final Map<String, Lock> fileNameToLock = new ConcurrentHashMap<>();
+
+    FileReceiver(Path dir) {
+        this.dir = dir;
+    }
+
+    void receive(FileTransferInfo info) {
+        if (result.isDone()) {
+            throw new IllegalStateException("Received file transfer info after 
result is already done.");
+        }
+        filesCount.set(info.filesCount());
+    }
+
+    void receive(FileHeader header) {
+        if (result.isDone()) {
+            throw new IllegalStateException("Received file header after result 
is already done.");
+        }
+        doInLock(header.fileName(), () -> receive0(header));
+    }
+
+    void receive(ChunkedFile chunkedFile) {
+        if (result.isDone()) {
+            throw new IllegalStateException("Received chunked file after 
result is already done.");
+        }
+        doInLock(chunkedFile.fileName(), () -> receive0(chunkedFile));
+    }
+
+    private void receive0(FileHeader header) {
+        try {
+            Path path = Files.createFile(dir.resolve(header.fileName()));

Review Comment:
   If I'm not mistaken, this method is called from a MessagingService incoming 
thread (which is one per Ignite instance), but it can do I/O and/or block on a 
lock. This might cause starvation.
   
   I suggest calling `FileReceiver` methods in an explicit thread pool.



##########
modules/file-transferring/src/main/java/org/apache/ignite/internal/network/file/FileTransferringServiceImpl.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.file.messages.ChunkedFile;
+import org.apache.ignite.internal.network.file.messages.FileDownloadRequest;
+import 
org.apache.ignite.internal.network.file.messages.FileDownloadRequestImpl;
+import org.apache.ignite.internal.network.file.messages.FileDownloadResponse;
+import 
org.apache.ignite.internal.network.file.messages.FileDownloadResponseImpl;
+import org.apache.ignite.internal.network.file.messages.FileHeader;
+import org.apache.ignite.internal.network.file.messages.FileTransferInfo;
+import 
org.apache.ignite.internal.network.file.messages.FileTransferringMessageType;
+import org.apache.ignite.internal.network.file.messages.FileUploadRequest;
+import org.apache.ignite.internal.network.file.messages.FileUploadRequestImpl;
+import org.apache.ignite.internal.network.file.messages.TransferMetadata;
+import org.apache.ignite.internal.util.FilesUtils;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ChannelType;
+import org.apache.ignite.network.MessagingService;
+import org.apache.ignite.network.annotations.Transferable;
+
+/**
+ * Implementation of {@link FileTransferringService}.
+ */
+public class FileTransferringServiceImpl implements FileTransferringService {
+    private static final IgniteLogger LOG = 
Loggers.forClass(FileTransferringServiceImpl.class);
+
+    private static final int CHUNK_SIZE = 1024 * 1024;
+    private static final int CONCURRENT_REQUESTS = 10;
+    private static final ChannelType FILE_TRANSFERRING_CHANNEL = 
ChannelType.register((short) 1, "FileTransferring");
+
+    /**
+     * Cluster service.
+     */
+    private final MessagingService messagingService;
+
+    private final Path tempDirectory;
+
+    private final FileSender fileSender;
+
+    private final Map<UUID, FileReceiver> transferIdToReceiver = new 
ConcurrentHashMap<>();
+    private final Map<Short, FileProvider<TransferMetadata>> 
metadataToProvider = new ConcurrentHashMap<>();
+    private final Map<Short, FileHandler<TransferMetadata>> metadataToHandler 
= new ConcurrentHashMap<>();
+
+    /**
+     * Constructor.
+     *
+     * @param messagingService Messaging service.
+     * @param tempDirectory Temporary directory.
+     */
+    public FileTransferringServiceImpl(MessagingService messagingService, Path 
tempDirectory) {
+        this.messagingService = messagingService;
+        this.tempDirectory = tempDirectory;
+        this.fileSender = new FileSender(CHUNK_SIZE, new 
RateLimiter(CONCURRENT_REQUESTS), (recipientConsistentId, message) -> {
+            return messagingService.send(recipientConsistentId, 
FILE_TRANSFERRING_CHANNEL, message);
+        });
+    }
+
+    @Override
+    public void start() {
+        messagingService.addMessageHandler(FileTransferringMessageType.class,
+                (message, senderConsistentId, correlationId) -> {
+                    if (message instanceof FileDownloadRequest) {
+                        processDownloadRequest((FileDownloadRequest) message, 
senderConsistentId, correlationId);
+                    } else if (message instanceof FileUploadRequest) {
+                        processUploadRequest((FileUploadRequest) message);
+                    } else if (message instanceof FileTransferInfo) {
+                        processFileTransferInfo((FileTransferInfo) message);
+                    } else if (message instanceof FileHeader) {
+                        processFileHeader((FileHeader) message);
+                    } else if (message instanceof ChunkedFile) {
+                        processChunkedFile((ChunkedFile) message);
+                    }
+                });
+    }
+
+    private void processUploadRequest(FileUploadRequest message) {
+        createFileReceiver(message.transferId())
+                .thenCompose(receiver -> 
handleFileTransferring(message.transferId(), receiver))
+                .thenCompose(path -> {
+                    return 
metadataToHandler.get(message.metadata().messageType()).handleUpload(message.metadata(),
 path)
+                            .whenComplete((v, e) -> {
+                                if (e != null) {
+                                    LOG.error("Failed to handle file upload. 
Metadata: {}", message.metadata(), e);
+                                }
+
+                                deleteDirectoryIfExists(path);
+                            });
+                });
+    }
+
+    private void processDownloadRequest(FileDownloadRequest message, String 
senderConsistentId, Long correlationId) {
+        
metadataToProvider.get(message.metadata().messageType()).files(message.metadata())
+                .handle((files, e) -> {
+                    if (e != null) {
+                        LOG.error("Failed to get files for download. Metadata: 
{}", message.metadata(), e);
+                        // todo send error response
+                        return completedFuture(null);
+                    }
+
+                    if (files.isEmpty()) {
+                        LOG.error("No files found for download. Metadata: {}", 
message.metadata());

Review Comment:
   Why is it an error if a provider returned an empty list of files?



##########
modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java:
##########
@@ -76,6 +76,26 @@ default CompletableFuture<Void> send(ClusterNode recipient, 
NetworkMessage msg)
      */
     CompletableFuture<Void> send(ClusterNode recipient, ChannelType 
channelType, NetworkMessage msg);
 
+    /**
+     * Tries to send the given message via specified channel asynchronously to 
the specific cluster member.
+     *
+     * <p>Guarantees:
+     * <ul>
+     *     <li>Messages send to same receiver will be delivered in the same 
order as they were sent;</li>
+     *     <li>If a message N has been successfully delivered to a member 
implies that all messages to same receiver
+     *     preceding N have also been successfully delivered.</li>
+     * </ul>
+     *
+     * <p>Please note that the guarantees only work for same (sender, 
receiver) pairs. That is, if A sends m1 and m2
+     * to B, then the guarantees are maintained. If, on the other hand, A 
sends m1 to B and m2 to C, then no guarantees
+     * exist.
+     *
+     * @param recipientConsistentId Consistent ID of the recipient of the 
message.
+     * @param msg       Message which should be delivered.
+     * @return Future of the send operation.
+     */
+    CompletableFuture<Void> send(String recipientConsistentId, ChannelType 
channelType, NetworkMessage msg);

Review Comment:
   Documentation about `ChannelType` is missing.
   
   Also, it's not clear what will happen when using old `send()` without this 
parameter. Some default channel wlll be used? No channel will be used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to