PakhomovAlexander commented on code in PR #2390:
URL: https://github.com/apache/ignite-3/pull/2390#discussion_r1286009821


##########
modules/file-transfer/src/integrationTest/java/org/apache/ignite/internal/network/file/ItFileTransferTest.java:
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static 
org.apache.ignite.internal.network.file.FileAssertions.assertContentEquals;
+import static org.apache.ignite.internal.network.file.FileGenerator.randomFile;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.AccessDeniedException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import 
org.apache.ignite.internal.network.configuration.FileTransferringConfiguration;
+import org.apache.ignite.internal.network.file.TestCluster.Node;
+import org.apache.ignite.internal.network.file.messages.Metadata;
+import org.apache.ignite.internal.network.file.messages.MetadataImpl;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Integration tests for file transferring.
+ */
+@ExtendWith(ConfigurationExtension.class)
+@ExtendWith(WorkDirectoryExtension.class)
+public class ItFileTransferTest {
+
+    @WorkDirectory
+    private Path workDir;
+
+    @InjectConfiguration
+    private FileTransferringConfiguration configuration;
+
+    private TestCluster cluster;
+
+    @BeforeEach
+    void setUp(TestInfo testInfo) throws InterruptedException {
+        cluster = new TestCluster(2, configuration, workDir, testInfo);
+        cluster.startAwait();
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        cluster.shutdown();
+    }
+
+    @Test
+    void download() throws IOException {
+        Node node0 = cluster.members.get(0);

Review Comment:
   Could you name `node0` like `sourceNode` and `node1` -> `targetNode`.



##########
modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/matchers/FileContentMatcher.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.testframework.matchers;
+
+import static org.hamcrest.CoreMatchers.is;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+
+/**
+ * {@link TypeSafeMatcher} for matching file content.
+ */
+public class FileContentMatcher extends TypeSafeMatcher<File> {
+
+    /** Matcher to forward the result of the completable future. */
+    private final Matcher<byte[]> matcher;
+
+    private FileContentMatcher(Matcher<byte[]> matcher) {
+        this.matcher = matcher;
+    }
+
+    /**
+     * Creates a matcher for matching file content.
+     *
+     * @param file File to match.
+     * @return Matcher for matching file content.
+     */
+    public static FileContentMatcher hasContent(File file) {
+        try {
+            return new 
FileContentMatcher(is(Files.readAllBytes(file.toPath())));

Review Comment:
   You can use the static method here instead of the constructor. `return 
hasContent(is(Files....))`



##########
modules/file-transfer/src/integrationTest/java/org/apache/ignite/internal/network/file/ItFileTransferTest.java:
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static 
org.apache.ignite.internal.network.file.FileAssertions.assertContentEquals;
+import static org.apache.ignite.internal.network.file.FileGenerator.randomFile;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.AccessDeniedException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import 
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import 
org.apache.ignite.internal.network.configuration.FileTransferringConfiguration;
+import org.apache.ignite.internal.network.file.TestCluster.Node;
+import org.apache.ignite.internal.network.file.messages.Metadata;
+import org.apache.ignite.internal.network.file.messages.MetadataImpl;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Integration tests for file transferring.
+ */
+@ExtendWith(ConfigurationExtension.class)
+@ExtendWith(WorkDirectoryExtension.class)
+public class ItFileTransferTest {
+
+    @WorkDirectory
+    private Path workDir;
+
+    @InjectConfiguration
+    private FileTransferringConfiguration configuration;
+
+    private TestCluster cluster;
+
+    @BeforeEach
+    void setUp(TestInfo testInfo) throws InterruptedException {
+        cluster = new TestCluster(2, configuration, workDir, testInfo);
+        cluster.startAwait();
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        cluster.shutdown();
+    }
+
+    @Test
+    void download() throws IOException {
+        Node node0 = cluster.members.get(0);
+
+        String unit = "unit";
+        String version = "1.0.0";
+        Path unitPath = node0.workDir().resolve(unit).resolve(version);
+        Files.createDirectories(unitPath);
+
+        int chunkSize = configuration.value().chunkSize();
+        File file1 = randomFile(unitPath, 0);
+        File file2 = randomFile(unitPath, chunkSize + 1);
+        File file3 = randomFile(unitPath, chunkSize * 2);
+        File file4 = randomFile(unitPath, chunkSize * 2);
+
+        node0.fileTransferringService().addFileProvider(
+                Metadata.class,
+                req -> completedFuture(List.of(file1, file2, file3, file4))
+        );
+
+        Node node1 = cluster.members.get(1);
+        CompletableFuture<List<File>> downloadedFilesFuture = 
node1.fileTransferringService().download(
+                node0.nodeName(),
+                MetadataImpl.builder().build()
+        );
+
+        assertThat(
+                downloadedFilesFuture.thenAccept(files -> {
+                    assertContentEquals(Set.of(file1, file2, file3, file4), 
new HashSet<>(files));

Review Comment:
   Why do you use sets in the assertion? This might hide duplicates.



##########
modules/file-transfer/build.gradle:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply from: "$rootDir/buildscripts/java-core.gradle"
+apply from: "$rootDir/buildscripts/publishing.gradle"
+apply from: "$rootDir/buildscripts/java-junit5.gradle"
+apply from: "$rootDir/buildscripts/java-integration-test.gradle"
+apply from: "$rootDir/buildscripts/java-test-fixtures.gradle"
+
+dependencies {

Review Comment:
   This is a huge module and the README would be usefull.



##########
modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java:
##########
@@ -184,6 +184,11 @@ public CompletableFuture<Void> send(ClusterNode recipient, 
ChannelType channelTy
             return CompletableFuture.completedFuture(null);
         }
 
+        @Override
+        public CompletableFuture<Void> send(String recipientConsistentId, 
ChannelType channelType, NetworkMessage msg) {
+            throw new AssertionError("Not implemented yet");

Review Comment:
   Do we have a ticket for the implementation?



##########
modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/RateLimiter.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+class RateLimiter {
+    private final int maxConcurrentRequests;
+    private final AtomicInteger counter = new AtomicInteger(0);
+
+    RateLimiter(int maxConcurrentRequests) {
+        this.maxConcurrentRequests = maxConcurrentRequests;
+    }
+
+    boolean tryAcquire() {
+        int count = counter.incrementAndGet();
+        if (count <= maxConcurrentRequests) {

Review Comment:
   I think using explicit lock is abetter choice here.



##########
modules/file-transfer/src/integrationTest/java/org/apache/ignite/internal/network/file/TestCluster.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import static java.util.stream.Collectors.toUnmodifiableList;
+import static org.apache.ignite.utils.ClusterServiceTestUtils.clusterService;
+import static 
org.apache.ignite.utils.ClusterServiceTestUtils.findLocalAddresses;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import 
org.apache.ignite.internal.network.configuration.FileTransferringConfiguration;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.NodeFinder;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.apache.ignite.network.TopologyEventHandler;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Test cluster.
+ */
+public class TestCluster {
+    /** Members of the cluster. */
+    final List<Node> members;
+
+    /** Latch that is locked until all members are visible in the topology. */
+    private final CountDownLatch startupLatch;
+
+    /** Node finder. */
+    private final NodeFinder nodeFinder;
+
+    private final FileTransferringConfiguration configuration;
+
+    /**
+     * Creates a test cluster with the given amount of members.
+     *
+     * @param numOfNodes Amount of cluster members.
+     * @param testInfo Test info.
+     */
+    TestCluster(int numOfNodes, FileTransferringConfiguration configuration, 
Path workDir, TestInfo testInfo) {
+        this.startupLatch = new CountDownLatch(numOfNodes - 1);
+
+        int initialPort = 3344;
+
+        List<NetworkAddress> addresses = findLocalAddresses(initialPort, 
initialPort + numOfNodes);
+
+        this.nodeFinder = new StaticNodeFinder(addresses);
+        this.configuration = configuration;
+
+        var isInitial = new AtomicBoolean(true);
+
+        this.members = addresses.stream()
+                .map(addr -> startNode(workDir, testInfo, addr, 
isInitial.getAndSet(false)))
+                .collect(toUnmodifiableList());
+    }
+
+    /**
+     * Start cluster node.
+     *
+     * @param testInfo Test info.
+     * @param addr Node address.
+     * @param initial Whether this node is the first one.
+     * @return Started cluster node.
+     */
+    private Node startNode(
+            Path workDir, TestInfo testInfo, NetworkAddress addr, boolean 
initial
+    ) {
+        ClusterService clusterSvc = clusterService(testInfo, addr.port(), 
nodeFinder);
+
+        if (initial) {
+            clusterSvc.topologyService().addEventHandler(new 
TopologyEventHandler() {
+                /** {@inheritDoc} */
+                @Override
+                public void onAppeared(ClusterNode member) {
+                    startupLatch.countDown();
+                }
+            });
+        }
+
+        try {
+            Path nodeDir = Files.createDirectory(workDir.resolve("node-" + 
clusterSvc.nodeName()));
+            FileTransferServiceImpl fileTransferringService = new 
FileTransferServiceImpl(
+                    clusterSvc.nodeName(),
+                    clusterSvc.messagingService(),
+                    configuration,
+                    nodeDir
+            );
+            return new Node(nodeDir, clusterSvc, fileTransferringService);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Starts and waits for the cluster to come up.
+     *
+     * @throws InterruptedException If failed.
+     * @throws AssertionError If the cluster was unable to start in 3 seconds.
+     */
+    void startAwait() throws InterruptedException {
+        members.forEach(Node::start);
+
+        if (!startupLatch.await(3, TimeUnit.SECONDS)) {

Review Comment:
   Why is `3` hardcoded?



##########
modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/ChunkedFileReader.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+
+class ChunkedFileReader implements ManuallyCloseable {
+    private final String fileName;
+    private final RandomAccessFile raf;
+    private final int chunkSize;
+
+    private ChunkedFileReader(String fileName, RandomAccessFile raf, int 
chunkSize) {
+        this.fileName = fileName;
+        this.raf = raf;
+        this.chunkSize = chunkSize;
+    }
+
+    static ChunkedFileReader open(File file, int chunkSize) throws IOException 
{
+        RandomAccessFile raf = new RandomAccessFile(file, "r");
+        return new ChunkedFileReader(file.getName(), raf, chunkSize);
+    }
+
+    byte[] readNextChunk() throws IOException {
+        int toRead = (int) Math.min(chunkSize, raf.length() - 
raf.getFilePointer());

Review Comment:
   I think you can use methods `length()` and `offset()`.



##########
modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/FileHandler.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import java.io.File;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.network.file.messages.Metadata;
+
+/**
+ * Handler for the uploaded file.
+ */
+public interface FileHandler<M extends Metadata> {
+    /**
+     * Handles the uploaded files.
+     *
+     * @param uploadedFiles The temporary files that were uploaded. These 
files will be deleted after the method returns.

Review Comment:
   Metadata parameter description is missed.



##########
modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/FileReceiver.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.file.messages.FileChunkMessage;
+import org.apache.ignite.internal.network.file.messages.FileHeaderMessage;
+import 
org.apache.ignite.internal.network.file.messages.FileTransferErrorMessage;
+import 
org.apache.ignite.internal.network.file.messages.FileTransferInfoMessage;
+
+/**
+ * File receiver.
+ */
+class FileReceiver {
+    private static final IgniteLogger LOG = 
Loggers.forClass(FileReceiver.class);
+
+    private final ExecutorService executorService;
+
+    private final Map<UUID, FileTransferMessagesHandler> transferIdToHandler = 
new ConcurrentHashMap<>();
+
+    FileReceiver(ExecutorService executorService) {
+        this.executorService = executorService;
+    }
+
+    FileTransferMessagesHandler registerTransfer(UUID transferId, Path 
handlerDir) {
+        FileTransferMessagesHandler handler = new 
FileTransferMessagesHandler(handlerDir);
+        transferIdToHandler.put(transferId, handler);
+        handler.result().whenComplete((files, throwable) -> 
transferIdToHandler.remove(transferId));

Review Comment:
   Why not use the `cancelTransfer` method? 



##########
modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/FileSender.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiFunction;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.network.NetworkMessage;
+
+class FileSender {
+    private static final IgniteLogger LOG = Loggers.forClass(FileSender.class);
+
+    private final int chunkSize;
+
+    private final RateLimiter rateLimiter;
+
+    private final BiFunction<String, NetworkMessage, CompletableFuture<Void>> 
send;
+
+    private final ExecutorService executorService;
+
+    FileSender(
+            int chunkSize,
+            RateLimiter rateLimiter,
+            BiFunction<String, NetworkMessage, CompletableFuture<Void>> send,
+            ExecutorService executorService) {
+        this.send = send;
+        this.chunkSize = chunkSize;
+        this.rateLimiter = rateLimiter;
+        this.executorService = executorService;
+    }
+
+    /**
+     * Adds files to the queue to be sent to the receiver.
+     */
+    CompletableFuture<Void> send(String receiverConsistentId, UUID id, 
List<File> files) {
+        return CompletableFuture.runAsync(() -> send0(receiverConsistentId, 
id, files), executorService);
+    }
+
+    private void send0(String receiverConsistentId, UUID id, List<File> files) 
{
+        AtomicBoolean interrupted = new AtomicBoolean(false);
+        try (FileTransferMessagesStream stream = new 
FileTransferMessagesStream(id, files, chunkSize)) {
+            while (stream.hasNextMessage() && !interrupted.get()) {
+                if (rateLimiter.tryAcquire()) {
+                    send.apply(receiverConsistentId, stream.nextMessage())
+                            .whenComplete((res, e) -> {
+                                if (e != null) {
+                                    LOG.error("Failed to send message to node: 
{}, transfer id: {}. Exception: {}",
+                                            receiverConsistentId,
+                                            id,
+                                            e
+                                    );
+                                    interrupted.set(true);
+                                }
+                                rateLimiter.release();

Review Comment:
   what if `send.apply` throws an exception? I think we have to use `try-final`



##########
modules/core/src/main/java/org/apache/ignite/lang/ErrorGroups.java:
##########
@@ -400,4 +400,15 @@ public static class Compute {
         /** Class loader error. */
         public static final int CLASS_LOADER_ERR = 
COMPUTE_ERR_GROUP.registerErrorCode((short) 2);
     }
+
+    /**
+     * File transfer error group.
+     */
+    public static class FileTransfer {

Review Comment:
   Agreed, move the error to the Nework group, please.



##########
modules/file-transfer/src/main/java/org/apache/ignite/internal/network/file/FileTransferMessagesHandler.java:
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.network.file.messages.FileChunkMessage;
+import org.apache.ignite.internal.network.file.messages.FileHeaderMessage;
+import 
org.apache.ignite.internal.network.file.messages.FileTransferInfoMessage;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteInternalException;
+
+class FileTransferMessagesHandler {
+    private final Path dir;

Review Comment:
   we tend to use empty lines when defining class fields.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to