SammyVimes commented on code in PR #1698:
URL: https://github.com/apache/ignite-3/pull/1698#discussion_r1113216703


##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployment;
+
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.OpenOption;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.deployment.DeploymentUnit;
+import org.apache.ignite.deployment.UnitStatus;
+import org.apache.ignite.deployment.UnitStatus.UnitStatusBuilder;
+import org.apache.ignite.deployment.version.Version;
+import org.apache.ignite.internal.AbstractClusterIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import 
org.apache.ignite.internal.deployunit.configuration.DeploymentConfiguration;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration tests for {@link org.apache.ignite.deployment.IgniteDeployment}.
+ */
+public class ItDeploymentUnitTest extends AbstractClusterIntegrationTest {

Review Comment:
   "It" is for integration tests



##########
modules/api/src/main/java/org/apache/ignite/deployment/DeploymentUnit.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.deployment;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Path;
+
+/**
+ * Deployment unit interface.
+ */
+public interface DeploymentUnit {
+
+    /**
+     * Unit content.
+     *
+     * @return Name of deployment unit.
+     */
+    String unitName();
+
+    /**
+     * Input stream with deployment unit content.
+     *
+     * @return input stream with deployment unit content.
+     */
+    InputStream content();
+
+
+    /**
+     * Create deployment unit from local path.
+     *
+     * @param path Path to local file.
+     * @return Deployment unit based on local file.
+     */
+    static DeploymentUnit fromPath(Path path) {
+        return new DeploymentUnit() {

Review Comment:
   Do we need this implementation in API module?



##########
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java:
##########
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.exists;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.stream.Collectors;
+import org.apache.ignite.deployment.DeploymentUnit;
+import org.apache.ignite.deployment.IgniteDeployment;
+import org.apache.ignite.deployment.UnitStatus;
+import org.apache.ignite.deployment.UnitStatus.UnitStatusBuilder;
+import org.apache.ignite.deployment.version.Version;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import 
org.apache.ignite.internal.deployunit.configuration.DeploymentConfiguration;
+import 
org.apache.ignite.internal.deployunit.exception.DeployUnitWriteMetaException;
+import 
org.apache.ignite.internal.deployunit.exception.DeploymentUnitReadException;
+import 
org.apache.ignite.internal.deployunit.exception.UndeployNotExistedDeploymentUnitException;
+import org.apache.ignite.internal.deployunit.message.DeployUnitMessageTypes;
+import org.apache.ignite.internal.deployunit.message.DeployUnitRequest;
+import org.apache.ignite.internal.deployunit.message.DeployUnitRequestBuilder;
+import org.apache.ignite.internal.deployunit.message.DeployUnitRequestImpl;
+import org.apache.ignite.internal.deployunit.message.DeployUnitResponse;
+import org.apache.ignite.internal.deployunit.message.DeployUnitResponseBuilder;
+import org.apache.ignite.internal.deployunit.message.DeployUnitResponseImpl;
+import org.apache.ignite.internal.deployunit.message.UndeployUnitRequest;
+import org.apache.ignite.internal.deployunit.message.UndeployUnitRequestImpl;
+import org.apache.ignite.internal.deployunit.message.UndeployUnitResponseImpl;
+import org.apache.ignite.internal.future.InFlightFutures;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.metastorage.dsl.Operations;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+
+/**
+ * Deployment manager implementation.
+ */
+public class DeploymentManagerImpl implements IgniteDeployment, 
IgniteComponent {
+
+    private static final IgniteLogger LOG = 
Loggers.forClass(DeploymentManagerImpl.class);
+
+    private static final String DEPLOY_UNIT_PREFIX = "deploy-unit.";
+
+    private static final String UNITS_PREFIX = DEPLOY_UNIT_PREFIX + "units.";
+
+    /**
+     * Meta storage.
+     */
+    private final MetaStorageManager metaStorage;
+
+    /**
+     * Deployment configuration.
+     */
+    private final DeploymentConfiguration configuration;
+
+    /**
+     * Cluster management group manager.
+     */
+    private final ClusterManagementGroupManager cmgManager;
+
+    /**
+     * In flight futures tracker.
+     */
+    private final InFlightFutures inFlightFutures = new InFlightFutures();
+
+    /**
+     * Cluster service.
+     */
+    private final ClusterService clusterService;
+
+    /**
+     * Folder for units.
+     */
+    private Path unitsFolder;
+
+    /**
+     * Constructor.
+     *
+     * @param clusterService Cluster service.
+     * @param metaStorage Meta storage.
+     * @param workDir Node working directory.
+     * @param configuration Deployment configuration.
+     * @param cmgManager Cluster management group manager.
+     */
+    public DeploymentManagerImpl(ClusterService clusterService,
+            MetaStorageManager metaStorage,
+            Path workDir,
+            DeploymentConfiguration configuration,
+            ClusterManagementGroupManager cmgManager) {
+        this.clusterService = clusterService;
+        this.metaStorage = metaStorage;
+        this.configuration = configuration;
+        this.cmgManager = cmgManager;
+        unitsFolder = workDir;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> deploy(String id, Version version, 
DeploymentUnit deploymentUnit) {
+        Set<Operation> operations = new HashSet<>();

Review Comment:
   Can be replaced with `List.of(put(key, UnitMetaSerializer.serialize(meta)))` 



##########
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java:
##########
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.exists;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.stream.Collectors;
+import org.apache.ignite.deployment.DeploymentUnit;
+import org.apache.ignite.deployment.IgniteDeployment;
+import org.apache.ignite.deployment.UnitStatus;
+import org.apache.ignite.deployment.UnitStatus.UnitStatusBuilder;
+import org.apache.ignite.deployment.version.Version;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import 
org.apache.ignite.internal.deployunit.configuration.DeploymentConfiguration;
+import 
org.apache.ignite.internal.deployunit.exception.DeployUnitWriteMetaException;
+import 
org.apache.ignite.internal.deployunit.exception.DeploymentUnitReadException;
+import 
org.apache.ignite.internal.deployunit.exception.UndeployNotExistedDeploymentUnitException;
+import org.apache.ignite.internal.deployunit.message.DeployUnitMessageTypes;
+import org.apache.ignite.internal.deployunit.message.DeployUnitRequest;
+import org.apache.ignite.internal.deployunit.message.DeployUnitRequestBuilder;
+import org.apache.ignite.internal.deployunit.message.DeployUnitRequestImpl;
+import org.apache.ignite.internal.deployunit.message.DeployUnitResponse;
+import org.apache.ignite.internal.deployunit.message.DeployUnitResponseBuilder;
+import org.apache.ignite.internal.deployunit.message.DeployUnitResponseImpl;
+import org.apache.ignite.internal.deployunit.message.UndeployUnitRequest;
+import org.apache.ignite.internal.deployunit.message.UndeployUnitRequestImpl;
+import org.apache.ignite.internal.deployunit.message.UndeployUnitResponseImpl;
+import org.apache.ignite.internal.future.InFlightFutures;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.metastorage.dsl.Operations;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+
+/**
+ * Deployment manager implementation.
+ */
+public class DeploymentManagerImpl implements IgniteDeployment, 
IgniteComponent {
+
+    private static final IgniteLogger LOG = 
Loggers.forClass(DeploymentManagerImpl.class);
+
+    private static final String DEPLOY_UNIT_PREFIX = "deploy-unit.";
+
+    private static final String UNITS_PREFIX = DEPLOY_UNIT_PREFIX + "units.";
+
+    /**
+     * Meta storage.
+     */
+    private final MetaStorageManager metaStorage;
+
+    /**
+     * Deployment configuration.
+     */
+    private final DeploymentConfiguration configuration;
+
+    /**
+     * Cluster management group manager.
+     */
+    private final ClusterManagementGroupManager cmgManager;
+
+    /**
+     * In flight futures tracker.
+     */
+    private final InFlightFutures inFlightFutures = new InFlightFutures();
+
+    /**
+     * Cluster service.
+     */
+    private final ClusterService clusterService;
+
+    /**
+     * Folder for units.
+     */
+    private Path unitsFolder;
+
+    /**
+     * Constructor.
+     *
+     * @param clusterService Cluster service.
+     * @param metaStorage Meta storage.
+     * @param workDir Node working directory.
+     * @param configuration Deployment configuration.
+     * @param cmgManager Cluster management group manager.
+     */
+    public DeploymentManagerImpl(ClusterService clusterService,
+            MetaStorageManager metaStorage,
+            Path workDir,
+            DeploymentConfiguration configuration,
+            ClusterManagementGroupManager cmgManager) {
+        this.clusterService = clusterService;
+        this.metaStorage = metaStorage;
+        this.configuration = configuration;
+        this.cmgManager = cmgManager;
+        unitsFolder = workDir;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> deploy(String id, Version version, 
DeploymentUnit deploymentUnit) {
+        Set<Operation> operations = new HashSet<>();
+
+        ByteArray key = new ByteArray(UNITS_PREFIX + id + ":" + 
version.render());
+
+        UnitMeta meta = new UnitMeta(id, version, deploymentUnit.unitName(), 
Collections.emptyList());
+
+        operations.add(put(key, UnitMetaSerializer.serialize(meta)));
+
+        DeployUnitRequestBuilder builder = DeployUnitRequestImpl.builder();
+
+        try {
+            builder.unitContent(deploymentUnit.content().readAllBytes());
+        } catch (IOException e) {
+            LOG.error("Error to read deployment unit content", e);
+            return CompletableFuture.failedFuture(new 
DeploymentUnitReadException(e));
+        }
+        DeployUnitRequest request = builder
+                .unitName(deploymentUnit.unitName())
+                .id(id)
+                .version(version.render())
+                .build();
+
+        return metaStorage.invoke(notExists(key),
+                        operations, Set.of(Operations.noop()))

Review Comment:
   ```suggestion
                           operations, Collections.emptyList())
   ```



##########
modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java:
##########
@@ -82,12 +82,12 @@ public class DefaultMessagingService extends 
AbstractMessagingService {
     private final AtomicLong correlationIdGenerator = new AtomicLong();
 
     /** Executor for outbound messages. */
-    private final ExecutorService outboundExecutor = 
Executors.newSingleThreadExecutor(
+    private final ExecutorService outboundExecutor = 
Executors.newFixedThreadPool(2,
             new NamedThreadFactory("MessagingService-outbound-", LOG)
     );
 
     /** Executor for inbound messages. */
-    private final ExecutorService inboundExecutor = 
Executors.newSingleThreadExecutor(
+    private final ExecutorService inboundExecutor = 
Executors.newFixedThreadPool(2,

Review Comment:
   Why specifically 2?  



##########
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitMetaSerializer.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.ignite.deployment.version.Version;
+
+/**
+ * Serializer for {@link UnitMeta}.
+ */
+public class UnitMetaSerializer {
+    private static final String SEPARATOR = ";";
+    private static final String LIST_SEPARATOR = ":";
+
+    /**
+     * Constructor.
+     */
+    private UnitMetaSerializer() {
+
+    }
+

Review Comment:
   ```suggestion
   ```



##########
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java:
##########
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.exists;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.stream.Collectors;
+import org.apache.ignite.deployment.DeploymentUnit;
+import org.apache.ignite.deployment.IgniteDeployment;
+import org.apache.ignite.deployment.UnitStatus;
+import org.apache.ignite.deployment.UnitStatus.UnitStatusBuilder;
+import org.apache.ignite.deployment.version.Version;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import 
org.apache.ignite.internal.deployunit.configuration.DeploymentConfiguration;
+import 
org.apache.ignite.internal.deployunit.exception.DeployUnitWriteMetaException;
+import 
org.apache.ignite.internal.deployunit.exception.DeploymentUnitReadException;
+import 
org.apache.ignite.internal.deployunit.exception.UndeployNotExistedDeploymentUnitException;
+import org.apache.ignite.internal.deployunit.message.DeployUnitMessageTypes;
+import org.apache.ignite.internal.deployunit.message.DeployUnitRequest;
+import org.apache.ignite.internal.deployunit.message.DeployUnitRequestBuilder;
+import org.apache.ignite.internal.deployunit.message.DeployUnitRequestImpl;
+import org.apache.ignite.internal.deployunit.message.DeployUnitResponse;
+import org.apache.ignite.internal.deployunit.message.DeployUnitResponseBuilder;
+import org.apache.ignite.internal.deployunit.message.DeployUnitResponseImpl;
+import org.apache.ignite.internal.deployunit.message.UndeployUnitRequest;
+import org.apache.ignite.internal.deployunit.message.UndeployUnitRequestImpl;
+import org.apache.ignite.internal.deployunit.message.UndeployUnitResponseImpl;
+import org.apache.ignite.internal.future.InFlightFutures;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.metastorage.dsl.Operations;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+
+/**
+ * Deployment manager implementation.
+ */
+public class DeploymentManagerImpl implements IgniteDeployment, 
IgniteComponent {
+
+    private static final IgniteLogger LOG = 
Loggers.forClass(DeploymentManagerImpl.class);
+
+    private static final String DEPLOY_UNIT_PREFIX = "deploy-unit.";
+
+    private static final String UNITS_PREFIX = DEPLOY_UNIT_PREFIX + "units.";
+
+    /**
+     * Meta storage.
+     */
+    private final MetaStorageManager metaStorage;
+
+    /**
+     * Deployment configuration.
+     */
+    private final DeploymentConfiguration configuration;
+
+    /**
+     * Cluster management group manager.
+     */
+    private final ClusterManagementGroupManager cmgManager;
+
+    /**
+     * In flight futures tracker.
+     */
+    private final InFlightFutures inFlightFutures = new InFlightFutures();
+
+    /**
+     * Cluster service.
+     */
+    private final ClusterService clusterService;
+
+    /**
+     * Folder for units.
+     */
+    private Path unitsFolder;
+
+    /**
+     * Constructor.
+     *
+     * @param clusterService Cluster service.
+     * @param metaStorage Meta storage.
+     * @param workDir Node working directory.
+     * @param configuration Deployment configuration.
+     * @param cmgManager Cluster management group manager.
+     */
+    public DeploymentManagerImpl(ClusterService clusterService,
+            MetaStorageManager metaStorage,
+            Path workDir,
+            DeploymentConfiguration configuration,
+            ClusterManagementGroupManager cmgManager) {
+        this.clusterService = clusterService;
+        this.metaStorage = metaStorage;
+        this.configuration = configuration;
+        this.cmgManager = cmgManager;
+        unitsFolder = workDir;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> deploy(String id, Version version, 
DeploymentUnit deploymentUnit) {
+        Set<Operation> operations = new HashSet<>();
+
+        ByteArray key = new ByteArray(UNITS_PREFIX + id + ":" + 
version.render());
+
+        UnitMeta meta = new UnitMeta(id, version, deploymentUnit.unitName(), 
Collections.emptyList());
+
+        operations.add(put(key, UnitMetaSerializer.serialize(meta)));
+
+        DeployUnitRequestBuilder builder = DeployUnitRequestImpl.builder();
+
+        try {
+            builder.unitContent(deploymentUnit.content().readAllBytes());
+        } catch (IOException e) {
+            LOG.error("Error to read deployment unit content", e);
+            return CompletableFuture.failedFuture(new 
DeploymentUnitReadException(e));
+        }
+        DeployUnitRequest request = builder
+                .unitName(deploymentUnit.unitName())
+                .id(id)
+                .version(version.render())
+                .build();
+
+        return metaStorage.invoke(notExists(key),
+                        operations, Set.of(Operations.noop()))
+                .thenCompose(success -> {
+                    if (success) {
+                        return doDeploy(request);
+                    }
+                    LOG.error("Failed to deploy meta of unit " + id + ":" + 
version);
+                    return CompletableFuture.failedFuture(new 
DeployUnitWriteMetaException());
+                })
+                .thenApply(completed -> {
+                    if (completed) {
+                        startDeployAsyncToCmg(request);
+                    }
+                    return completed;
+                });
+    }
+
+    private void startDeployAsyncToCmg(DeployUnitRequest request) {
+        cmgManager.cmgNodes()
+                .thenApply(nodes -> nodes.stream().map(node -> 
clusterService.topologyService().getByConsistentId(node)).collect(
+                        Collectors.toList()))
+                .thenAccept(clusterNodes -> {
+                    for (ClusterNode clusterNode : clusterNodes) {
+                        inFlightFutures.registerFuture(
+                                clusterService.messagingService()
+                                        .invoke(clusterNode, request, 
Long.MAX_VALUE)
+                                        .thenCompose(message -> {
+                                            Throwable error = 
((DeployUnitResponse) message).error();
+                                            if (error != null) {
+                                                LOG.error("Failed to deploy 
unit " + request.id() + ":" + request.version()
+                                                        + " to node " + 
clusterNode, error);
+                                                return 
CompletableFuture.failedFuture(error);
+                                            }
+                                            return 
CompletableFuture.completedFuture(true);
+                                        })
+                        );
+                    }
+                });
+    }
+
+    @Override
+    public CompletableFuture<Void> undeploy(String id, Version version) {
+        ByteArray key = new ByteArray(UNITS_PREFIX + id + ":" + version);
+
+        return metaStorage.invoke(exists(key), Operations.remove(key), 
Operations.noop())
+                .thenCompose(success -> {
+                    if (success) {
+                        return cmgManager.logicalTopology();
+                    }
+                    throw new UndeployNotExistedDeploymentUnitException("Unit 
" + id + " with version "
+                            + version + " doesn't exist.");
+                }).thenApply(logicalTopologySnapshot -> {
+                    for (ClusterNode node : logicalTopologySnapshot.nodes()) {
+                        clusterService.messagingService()
+                                .invoke(node, UndeployUnitRequestImpl.builder()
+                                                .id(id)
+                                                .version(version.render())
+                                                .build(),
+                                        Long.MAX_VALUE);
+                    }
+                    return null;
+                });
+    }
+
+    @Override
+    public CompletableFuture<Set<UnitStatus>> list() {
+        CompletableFuture<Set<UnitStatus>> result = new CompletableFuture<>();
+        Map<String, UnitStatusBuilder> map = new HashMap<>();
+        metaStorage.prefix(new ByteArray(UNITS_PREFIX))
+                .subscribe(new Subscriber<>() {
+                    @Override
+                    public void onSubscribe(Subscription subscription) {
+                        subscription.request(Long.MAX_VALUE);
+                    }
+
+                    @Override
+                    public void onNext(Entry item) {
+                        UnitMeta meta = 
UnitMetaSerializer.deserialize(item.value());
+                        map.computeIfAbsent(meta.getId(), UnitStatus::builder)
+                                .append(meta.getVersion(), 
meta.getConsistentIdLocation());
+                    }
+
+                    @Override
+                    public void onError(Throwable throwable) {
+                        result.completeExceptionally(throwable);
+                    }
+
+                    @Override
+                    public void onComplete() {
+                        
result.complete(map.values().stream().map(UnitStatusBuilder::build).collect(Collectors.toSet()));
+                    }
+                });
+        return result;
+    }
+
+    @Override
+    public CompletableFuture<Set<Version>> versions(String unitId) {
+        CompletableFuture<Set<Version>> result = new CompletableFuture<>();
+        metaStorage.prefix(new ByteArray(UNITS_PREFIX + unitId))
+                .subscribe(new Subscriber<>() {
+                    private final Set<Version> set = new HashSet<>();
+
+                    @Override
+                    public void onSubscribe(Subscription subscription) {
+                        subscription.request(Long.MAX_VALUE);
+                    }
+
+                    @Override
+                    public void onNext(Entry item) {
+                        UnitMeta deserialize = 
UnitMetaSerializer.deserialize(item.value());
+                        set.add(deserialize.getVersion());
+                    }
+
+                    @Override
+                    public void onError(Throwable throwable) {
+                        result.completeExceptionally(throwable);
+                    }
+
+                    @Override
+                    public void onComplete() {
+                        result.complete(set);
+                    }
+                });
+        return result;
+    }
+
+    @Override
+    public CompletableFuture<UnitStatus> status(String id) {
+        CompletableFuture<UnitStatus> result = new CompletableFuture<>();
+        metaStorage.prefix(new ByteArray(UNITS_PREFIX + id))
+                .subscribe(new Subscriber<>() {
+                    private final UnitStatusBuilder builder = 
UnitStatus.builder(id);
+                    private final Set<Version> set = new HashSet<>();
+
+                    @Override
+                    public void onSubscribe(Subscription subscription) {
+                        subscription.request(Long.MAX_VALUE);
+                    }
+
+                    @Override
+                    public void onNext(Entry item) {
+                        UnitMeta deserialize = 
UnitMetaSerializer.deserialize(item.value());
+                        builder.append(deserialize.getVersion(), 
deserialize.getConsistentIdLocation());
+                    }
+
+                    @Override
+                    public void onError(Throwable throwable) {
+                        result.completeExceptionally(throwable);
+                    }
+
+                    @Override
+                    public void onComplete() {
+                        result.complete(builder.build());
+                    }
+                });
+        return result;
+    }
+
+    @Override
+    public void start() {
+        unitsFolder = 
unitsFolder.resolve(configuration.deploymentLocation().value());
+        
clusterService.messagingService().addMessageHandler(DeployUnitMessageTypes.class,
+                (message, senderConsistentId, correlationId) -> {
+
+                    if (message instanceof DeployUnitRequest) {
+                        processDeployRequest((DeployUnitRequest) message, 
senderConsistentId, correlationId);
+                    } else if (message instanceof UndeployUnitRequest) {
+                        processUndeployRequest((UndeployUnitRequest) message, 
senderConsistentId, correlationId);
+                    }
+                });
+    }
+
+    private void processDeployRequest(DeployUnitRequest executeRequest, String 
senderConsistentId, long correlationId) {
+        doDeploy(executeRequest).whenComplete((success, throwable) -> {
+            DeployUnitResponseBuilder builder = 
DeployUnitResponseImpl.builder();
+            if (throwable != null) {
+                builder.error(throwable);
+            }
+            clusterService.messagingService().respond(senderConsistentId,
+                    builder.build(), correlationId);
+        });
+    }
+
+    private void processUndeployRequest(UndeployUnitRequest executeRequest, 
String senderConsistentId, long correlationId) {
+        try {
+            Path unitPath = unitsFolder
+                    .resolve(executeRequest.id())
+                    .resolve(executeRequest.version());
+
+            Files.walkFileTree(unitPath, new SimpleFileVisitor<>() {
+                @Override
+                public FileVisitResult visitFile(Path file, 
BasicFileAttributes attrs) throws IOException {
+                    Files.delete(file);
+                    return FileVisitResult.CONTINUE;
+                }
+
+                @Override
+                public FileVisitResult postVisitDirectory(Path dir, 
IOException exc) throws IOException {
+                    Files.delete(dir);
+                    return FileVisitResult.CONTINUE;
+                }
+            });
+        } catch (IOException e) {
+            LOG.error("Failed to undeploy unit " + executeRequest.id() + ":" + 
executeRequest.version(), e);
+            clusterService.messagingService()
+                    .respond(senderConsistentId, 
UndeployUnitResponseImpl.builder().error(e).build(), correlationId);
+            return;
+        }
+
+        clusterService.messagingService()
+                .respond(senderConsistentId, 
UndeployUnitResponseImpl.builder().build(), correlationId);
+    }
+
+    private CompletableFuture<Boolean> doDeploy(DeployUnitRequest 
executeRequest) {
+        String id = executeRequest.id();
+        String version = executeRequest.version();
+        try {
+            Path unitPath = unitsFolder
+                    .resolve(executeRequest.id())
+                    .resolve(executeRequest.version())
+                    .resolve(executeRequest.unitName());
+            Path unitPathTmp = unitPath.resolveSibling(unitPath.getFileName() 
+ ".tmp");
+            Files.createDirectories(unitPathTmp.getParent());
+            Files.write(unitPathTmp, executeRequest.unitContent(),
+                    StandardOpenOption.CREATE, StandardOpenOption.SYNC, 
StandardOpenOption.TRUNCATE_EXISTING);
+            Files.move(unitPathTmp, unitPath,
+                    StandardCopyOption.ATOMIC_MOVE, 
StandardCopyOption.REPLACE_EXISTING);

Review Comment:
   Also, I think StandardCopyOption.ATOMIC_MOVE etc should be statically 
imported 



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java:
##########
@@ -82,7 +82,7 @@ public abstract class AbstractClusterIntegrationTest extends 
BaseIgniteAbstractT
 
     /** Work directory. */
     @WorkDirectory
-    protected Path workDir;
+    protected static Path workDir;

Review Comment:
   Hm, this changes the lifecycle of work directory. Before that it was created 
for each test and cleaned after each test. Now it will be created once before 
all tests. Are you sure this works ok?



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployment;
+
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.OpenOption;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.deployment.DeploymentUnit;
+import org.apache.ignite.deployment.UnitStatus;
+import org.apache.ignite.deployment.UnitStatus.UnitStatusBuilder;
+import org.apache.ignite.deployment.version.Version;
+import org.apache.ignite.internal.AbstractClusterIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import 
org.apache.ignite.internal.deployunit.configuration.DeploymentConfiguration;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration tests for {@link org.apache.ignite.deployment.IgniteDeployment}.
+ */
+public class ItDeploymentUnitTest extends AbstractClusterIntegrationTest {
+
+    private static final long REPLICA_TIMEOUT = 10;
+    private static final long SIZE_IN_BYTES = 1024L;
+
+    private static Path dummyFile;
+
+    @BeforeAll
+    public static void generateDummy() throws IOException {
+        OpenOption[] options = {
+                StandardOpenOption.WRITE,
+                StandardOpenOption.CREATE
+        };
+
+        dummyFile = workDir.resolve("dummy.txt");
+
+        try (SeekableByteChannel channel = Files.newByteChannel(dummyFile, 
options)) {
+            channel.position(SIZE_IN_BYTES);
+
+            ByteBuffer buf = ByteBuffer.allocate(4).putInt(2);
+            buf.rewind();
+            channel.write(buf);
+        }
+    }
+
+    @Test
+    public void testDeploy() throws Exception {
+        Unit unit = deployAndVerify("test", Version.parseVersion("1.1.0"), 1);
+
+
+        IgniteImpl cmg = cluster.node(0);
+        waitUnitReplica(cmg, unit);
+
+        CompletableFuture<Set<UnitStatus>> list = node(2).deployment().list();
+        UnitStatusBuilder builder = 
UnitStatus.builder(unit.id).append(unit.version, 
List.of(unit.deployedNode.name(), cmg.name()));
+        assertThat(list, willBe(Set.of(builder.build())));
+    }
+
+    @Test
+    public void testDeployUndeploy() throws Exception {
+        Unit unit = deployAndVerify("test", Version.parseVersion("1.1.0"), 1);
+        unit.undeploy();
+
+        CompletableFuture<Set<UnitStatus>> list = node(2).deployment().list();
+        assertThat(list, willBe(Collections.emptySet()));
+    }
+
+    @Test
+    public void testDeployTwoUnits() throws Exception {
+        String id = "test";
+        Unit unit1 = deployAndVerify(id, Version.parseVersion("1.1.0"), 1);
+        Unit unit2 = deployAndVerify(id, Version.parseVersion("1.1.1"), 2);
+
+        IgniteImpl cmg = cluster.node(0);
+        waitUnitReplica(cmg, unit1);
+        waitUnitReplica(cmg, unit2);
+
+        CompletableFuture<UnitStatus> list = node(2).deployment().status(id);
+        UnitStatusBuilder status = UnitStatus.builder(id)
+                .append(unit1.version, List.of(unit1.deployedNode.name(), 
cmg.name()))
+                .append(unit2.version, List.of(unit2.deployedNode.name(), 
cmg.name()));
+        assertThat(list, willBe(status.build()));
+
+        CompletableFuture<Set<Version>> versions = 
node(2).deployment().versions(unit1.id);
+        assertThat(versions, willBe(Set.of(unit1.version, unit2.version)));
+    }
+
+
+    @Test
+    public void testDeployTwoUnitsAndUndeployOne() throws Exception {
+        Unit unit1 = deployAndVerify("test", Version.parseVersion("1.1.0"), 1);
+        Unit unit2 = deployAndVerify("test", Version.parseVersion("1.1.1"), 2);
+
+        IgniteImpl cmg = cluster.node(0);
+        waitUnitReplica(cmg, unit1);
+        waitUnitReplica(cmg, unit2);
+
+        CompletableFuture<UnitStatus> list = 
node(2).deployment().status(unit2.id);
+        UnitStatusBuilder builder = UnitStatus.builder(unit1.id)
+                .append(unit1.version, List.of(unit1.deployedNode.name(), 
cmg.name()))
+                .append(unit2.version, List.of(unit2.deployedNode.name(), 
cmg.name()));
+        assertThat(list, willBe(builder.build()));
+
+        unit2.undeploy();
+        CompletableFuture<Set<Version>> newVersions = 
node(2).deployment().versions(unit1.id);
+        assertThat(newVersions, willBe(Set.of(unit1.version)));
+    }
+
+
+    private Unit deployAndVerify(String id, Version version, int nodeIndex) 
throws Exception {
+        IgniteImpl entryNode = node(nodeIndex);
+
+        CompletableFuture<Boolean> deploy = entryNode.deployment()
+                .deploy(id, version, DeploymentUnit.fromPath(dummyFile));
+
+        assertThat(deploy.get(), is(true));
+
+        Unit unit = new Unit(entryNode, id, version);
+
+        assertThat(getNodeUnitFile(unit).toFile().exists(), is(true));
+
+        return unit;
+    }
+
+
+    private static Path getNodeUnitFile(Unit unit) {
+        return getNodeUnitFile(unit.deployedNode, unit.id, unit.version);
+    }
+
+    private static Path getNodeUnitFile(IgniteImpl node, String unitId, 
Version unitVersion) {
+        String deploymentFolder = node.nodeConfiguration()
+                .getConfiguration(DeploymentConfiguration.KEY)
+                .deploymentLocation().value();
+        Path resolve = workDir.resolve(node.name()).resolve(deploymentFolder);
+        return resolve.resolve(unitId)
+                .resolve(unitVersion.render())
+                .resolve(dummyFile.getFileName());
+    }
+
+
+    private static void waitUnitReplica(IgniteImpl ignite, Unit unit) throws 
InterruptedException {
+        Path unitPath = getNodeUnitFile(ignite, unit.id, unit.version);
+        IgniteTestUtils.waitForCondition(() -> 
unitPath.toFile().getTotalSpace() == SIZE_IN_BYTES, REPLICA_TIMEOUT);
+    }
+
+    private static void waitUnitClean(IgniteImpl ignite, Unit unit) throws 
InterruptedException {
+        Path unitPath = getNodeUnitFile(ignite, unit.id, unit.version);
+        IgniteTestUtils.waitForCondition(() -> !unitPath.toFile().exists(), 
REPLICA_TIMEOUT);
+    }
+
+    static class Unit {
+        private final IgniteImpl deployedNode;
+        private final String id;
+        private final Version version;
+
+
+        Unit(IgniteImpl deployedNode, String id, Version version) {
+            this.deployedNode = deployedNode;
+            this.id = id;
+            this.version = version;
+        }
+

Review Comment:
   ```suggestion
   ```



##########
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitMetaSerializer.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.ignite.deployment.version.Version;
+
+/**
+ * Serializer for {@link UnitMeta}.
+ */
+public class UnitMetaSerializer {
+    private static final String SEPARATOR = ";";
+    private static final String LIST_SEPARATOR = ":";
+
+    /**
+     * Constructor.
+     */
+    private UnitMetaSerializer() {
+
+    }
+
+
+    /**
+     * Serialize unit meta.
+     *
+     * @param meta Unit meta.
+     * @return Serialized unit meta.
+     */
+    public static byte[] serialize(UnitMeta meta) {
+        StringBuilder sb = new StringBuilder();
+
+        sb.append(meta.getId()).append(SEPARATOR)
+                .append(meta.getVersion().render()).append(SEPARATOR)
+                .append(meta.getUnitName()).append(SEPARATOR);

Review Comment:
   What if `id`, `name` or `version` contains separator or list separator?



##########
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java:
##########
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.exists;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.stream.Collectors;
+import org.apache.ignite.deployment.DeploymentUnit;
+import org.apache.ignite.deployment.IgniteDeployment;
+import org.apache.ignite.deployment.UnitStatus;
+import org.apache.ignite.deployment.UnitStatus.UnitStatusBuilder;
+import org.apache.ignite.deployment.version.Version;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import 
org.apache.ignite.internal.deployunit.configuration.DeploymentConfiguration;
+import 
org.apache.ignite.internal.deployunit.exception.DeployUnitWriteMetaException;
+import 
org.apache.ignite.internal.deployunit.exception.DeploymentUnitReadException;
+import 
org.apache.ignite.internal.deployunit.exception.UndeployNotExistedDeploymentUnitException;
+import org.apache.ignite.internal.deployunit.message.DeployUnitMessageTypes;
+import org.apache.ignite.internal.deployunit.message.DeployUnitRequest;
+import org.apache.ignite.internal.deployunit.message.DeployUnitRequestBuilder;
+import org.apache.ignite.internal.deployunit.message.DeployUnitRequestImpl;
+import org.apache.ignite.internal.deployunit.message.DeployUnitResponse;
+import org.apache.ignite.internal.deployunit.message.DeployUnitResponseBuilder;
+import org.apache.ignite.internal.deployunit.message.DeployUnitResponseImpl;
+import org.apache.ignite.internal.deployunit.message.UndeployUnitRequest;
+import org.apache.ignite.internal.deployunit.message.UndeployUnitRequestImpl;
+import org.apache.ignite.internal.deployunit.message.UndeployUnitResponseImpl;
+import org.apache.ignite.internal.future.InFlightFutures;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.metastorage.dsl.Operations;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+
+/**
+ * Deployment manager implementation.
+ */
+public class DeploymentManagerImpl implements IgniteDeployment, 
IgniteComponent {
+
+    private static final IgniteLogger LOG = 
Loggers.forClass(DeploymentManagerImpl.class);
+
+    private static final String DEPLOY_UNIT_PREFIX = "deploy-unit.";
+
+    private static final String UNITS_PREFIX = DEPLOY_UNIT_PREFIX + "units.";
+
+    /**
+     * Meta storage.
+     */
+    private final MetaStorageManager metaStorage;
+
+    /**
+     * Deployment configuration.
+     */
+    private final DeploymentConfiguration configuration;
+
+    /**
+     * Cluster management group manager.
+     */
+    private final ClusterManagementGroupManager cmgManager;
+
+    /**
+     * In flight futures tracker.
+     */
+    private final InFlightFutures inFlightFutures = new InFlightFutures();
+
+    /**
+     * Cluster service.
+     */
+    private final ClusterService clusterService;
+
+    /**
+     * Folder for units.
+     */
+    private Path unitsFolder;
+
+    /**
+     * Constructor.
+     *
+     * @param clusterService Cluster service.
+     * @param metaStorage Meta storage.
+     * @param workDir Node working directory.
+     * @param configuration Deployment configuration.
+     * @param cmgManager Cluster management group manager.
+     */
+    public DeploymentManagerImpl(ClusterService clusterService,
+            MetaStorageManager metaStorage,
+            Path workDir,
+            DeploymentConfiguration configuration,
+            ClusterManagementGroupManager cmgManager) {
+        this.clusterService = clusterService;
+        this.metaStorage = metaStorage;
+        this.configuration = configuration;
+        this.cmgManager = cmgManager;
+        unitsFolder = workDir;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> deploy(String id, Version version, 
DeploymentUnit deploymentUnit) {
+        Set<Operation> operations = new HashSet<>();
+
+        ByteArray key = new ByteArray(UNITS_PREFIX + id + ":" + 
version.render());
+
+        UnitMeta meta = new UnitMeta(id, version, deploymentUnit.unitName(), 
Collections.emptyList());
+
+        operations.add(put(key, UnitMetaSerializer.serialize(meta)));
+
+        DeployUnitRequestBuilder builder = DeployUnitRequestImpl.builder();
+
+        try {
+            builder.unitContent(deploymentUnit.content().readAllBytes());
+        } catch (IOException e) {
+            LOG.error("Error to read deployment unit content", e);
+            return CompletableFuture.failedFuture(new 
DeploymentUnitReadException(e));
+        }
+        DeployUnitRequest request = builder
+                .unitName(deploymentUnit.unitName())
+                .id(id)
+                .version(version.render())
+                .build();
+
+        return metaStorage.invoke(notExists(key),
+                        operations, Set.of(Operations.noop()))
+                .thenCompose(success -> {
+                    if (success) {
+                        return doDeploy(request);
+                    }
+                    LOG.error("Failed to deploy meta of unit " + id + ":" + 
version);
+                    return CompletableFuture.failedFuture(new 
DeployUnitWriteMetaException());
+                })
+                .thenApply(completed -> {
+                    if (completed) {
+                        startDeployAsyncToCmg(request);
+                    }
+                    return completed;
+                });
+    }
+
+    private void startDeployAsyncToCmg(DeployUnitRequest request) {
+        cmgManager.cmgNodes()
+                .thenApply(nodes -> nodes.stream().map(node -> 
clusterService.topologyService().getByConsistentId(node)).collect(
+                        Collectors.toList()))
+                .thenAccept(clusterNodes -> {
+                    for (ClusterNode clusterNode : clusterNodes) {
+                        inFlightFutures.registerFuture(
+                                clusterService.messagingService()
+                                        .invoke(clusterNode, request, 
Long.MAX_VALUE)
+                                        .thenCompose(message -> {
+                                            Throwable error = 
((DeployUnitResponse) message).error();
+                                            if (error != null) {
+                                                LOG.error("Failed to deploy 
unit " + request.id() + ":" + request.version()
+                                                        + " to node " + 
clusterNode, error);
+                                                return 
CompletableFuture.failedFuture(error);
+                                            }
+                                            return 
CompletableFuture.completedFuture(true);
+                                        })
+                        );
+                    }
+                });
+    }
+
+    @Override
+    public CompletableFuture<Void> undeploy(String id, Version version) {
+        ByteArray key = new ByteArray(UNITS_PREFIX + id + ":" + version);
+
+        return metaStorage.invoke(exists(key), Operations.remove(key), 
Operations.noop())
+                .thenCompose(success -> {
+                    if (success) {
+                        return cmgManager.logicalTopology();
+                    }
+                    throw new UndeployNotExistedDeploymentUnitException("Unit 
" + id + " with version "
+                            + version + " doesn't exist.");
+                }).thenApply(logicalTopologySnapshot -> {
+                    for (ClusterNode node : logicalTopologySnapshot.nodes()) {
+                        clusterService.messagingService()
+                                .invoke(node, UndeployUnitRequestImpl.builder()
+                                                .id(id)
+                                                .version(version.render())
+                                                .build(),
+                                        Long.MAX_VALUE);
+                    }
+                    return null;
+                });
+    }
+
+    @Override
+    public CompletableFuture<Set<UnitStatus>> list() {
+        CompletableFuture<Set<UnitStatus>> result = new CompletableFuture<>();
+        Map<String, UnitStatusBuilder> map = new HashMap<>();
+        metaStorage.prefix(new ByteArray(UNITS_PREFIX))
+                .subscribe(new Subscriber<>() {
+                    @Override
+                    public void onSubscribe(Subscription subscription) {
+                        subscription.request(Long.MAX_VALUE);
+                    }
+
+                    @Override
+                    public void onNext(Entry item) {
+                        UnitMeta meta = 
UnitMetaSerializer.deserialize(item.value());
+                        map.computeIfAbsent(meta.getId(), UnitStatus::builder)
+                                .append(meta.getVersion(), 
meta.getConsistentIdLocation());
+                    }
+
+                    @Override
+                    public void onError(Throwable throwable) {
+                        result.completeExceptionally(throwable);
+                    }
+
+                    @Override
+                    public void onComplete() {
+                        
result.complete(map.values().stream().map(UnitStatusBuilder::build).collect(Collectors.toSet()));
+                    }
+                });
+        return result;
+    }
+
+    @Override
+    public CompletableFuture<Set<Version>> versions(String unitId) {
+        CompletableFuture<Set<Version>> result = new CompletableFuture<>();
+        metaStorage.prefix(new ByteArray(UNITS_PREFIX + unitId))
+                .subscribe(new Subscriber<>() {
+                    private final Set<Version> set = new HashSet<>();
+
+                    @Override
+                    public void onSubscribe(Subscription subscription) {
+                        subscription.request(Long.MAX_VALUE);
+                    }
+
+                    @Override
+                    public void onNext(Entry item) {
+                        UnitMeta deserialize = 
UnitMetaSerializer.deserialize(item.value());
+                        set.add(deserialize.getVersion());
+                    }
+
+                    @Override
+                    public void onError(Throwable throwable) {
+                        result.completeExceptionally(throwable);
+                    }
+
+                    @Override
+                    public void onComplete() {
+                        result.complete(set);
+                    }
+                });
+        return result;
+    }
+
+    @Override
+    public CompletableFuture<UnitStatus> status(String id) {
+        CompletableFuture<UnitStatus> result = new CompletableFuture<>();
+        metaStorage.prefix(new ByteArray(UNITS_PREFIX + id))
+                .subscribe(new Subscriber<>() {
+                    private final UnitStatusBuilder builder = 
UnitStatus.builder(id);
+                    private final Set<Version> set = new HashSet<>();
+
+                    @Override
+                    public void onSubscribe(Subscription subscription) {
+                        subscription.request(Long.MAX_VALUE);
+                    }
+
+                    @Override
+                    public void onNext(Entry item) {
+                        UnitMeta deserialize = 
UnitMetaSerializer.deserialize(item.value());
+                        builder.append(deserialize.getVersion(), 
deserialize.getConsistentIdLocation());
+                    }
+
+                    @Override
+                    public void onError(Throwable throwable) {
+                        result.completeExceptionally(throwable);
+                    }
+
+                    @Override
+                    public void onComplete() {
+                        result.complete(builder.build());
+                    }
+                });
+        return result;
+    }
+
+    @Override
+    public void start() {
+        unitsFolder = 
unitsFolder.resolve(configuration.deploymentLocation().value());
+        
clusterService.messagingService().addMessageHandler(DeployUnitMessageTypes.class,
+                (message, senderConsistentId, correlationId) -> {
+
+                    if (message instanceof DeployUnitRequest) {
+                        processDeployRequest((DeployUnitRequest) message, 
senderConsistentId, correlationId);
+                    } else if (message instanceof UndeployUnitRequest) {
+                        processUndeployRequest((UndeployUnitRequest) message, 
senderConsistentId, correlationId);
+                    }
+                });
+    }
+
+    private void processDeployRequest(DeployUnitRequest executeRequest, String 
senderConsistentId, long correlationId) {
+        doDeploy(executeRequest).whenComplete((success, throwable) -> {
+            DeployUnitResponseBuilder builder = 
DeployUnitResponseImpl.builder();
+            if (throwable != null) {
+                builder.error(throwable);
+            }
+            clusterService.messagingService().respond(senderConsistentId,
+                    builder.build(), correlationId);
+        });
+    }
+
+    private void processUndeployRequest(UndeployUnitRequest executeRequest, 
String senderConsistentId, long correlationId) {
+        try {
+            Path unitPath = unitsFolder
+                    .resolve(executeRequest.id())
+                    .resolve(executeRequest.version());
+
+            Files.walkFileTree(unitPath, new SimpleFileVisitor<>() {
+                @Override
+                public FileVisitResult visitFile(Path file, 
BasicFileAttributes attrs) throws IOException {
+                    Files.delete(file);
+                    return FileVisitResult.CONTINUE;
+                }
+
+                @Override
+                public FileVisitResult postVisitDirectory(Path dir, 
IOException exc) throws IOException {
+                    Files.delete(dir);
+                    return FileVisitResult.CONTINUE;
+                }
+            });

Review Comment:
   `postVisitDirectory`'s second argument should be rethrown if it's not null 
or else it will be silently swallowed 



##########
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java:
##########
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.exists;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.stream.Collectors;
+import org.apache.ignite.deployment.DeploymentUnit;
+import org.apache.ignite.deployment.IgniteDeployment;
+import org.apache.ignite.deployment.UnitStatus;
+import org.apache.ignite.deployment.UnitStatus.UnitStatusBuilder;
+import org.apache.ignite.deployment.version.Version;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import 
org.apache.ignite.internal.deployunit.configuration.DeploymentConfiguration;
+import 
org.apache.ignite.internal.deployunit.exception.DeployUnitWriteMetaException;
+import 
org.apache.ignite.internal.deployunit.exception.DeploymentUnitReadException;
+import 
org.apache.ignite.internal.deployunit.exception.UndeployNotExistedDeploymentUnitException;
+import org.apache.ignite.internal.deployunit.message.DeployUnitMessageTypes;
+import org.apache.ignite.internal.deployunit.message.DeployUnitRequest;
+import org.apache.ignite.internal.deployunit.message.DeployUnitRequestBuilder;
+import org.apache.ignite.internal.deployunit.message.DeployUnitRequestImpl;
+import org.apache.ignite.internal.deployunit.message.DeployUnitResponse;
+import org.apache.ignite.internal.deployunit.message.DeployUnitResponseBuilder;
+import org.apache.ignite.internal.deployunit.message.DeployUnitResponseImpl;
+import org.apache.ignite.internal.deployunit.message.UndeployUnitRequest;
+import org.apache.ignite.internal.deployunit.message.UndeployUnitRequestImpl;
+import org.apache.ignite.internal.deployunit.message.UndeployUnitResponseImpl;
+import org.apache.ignite.internal.future.InFlightFutures;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.metastorage.dsl.Operations;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+
+/**
+ * Deployment manager implementation.
+ */
+public class DeploymentManagerImpl implements IgniteDeployment, 
IgniteComponent {
+
+    private static final IgniteLogger LOG = 
Loggers.forClass(DeploymentManagerImpl.class);
+
+    private static final String DEPLOY_UNIT_PREFIX = "deploy-unit.";
+
+    private static final String UNITS_PREFIX = DEPLOY_UNIT_PREFIX + "units.";
+
+    /**
+     * Meta storage.
+     */
+    private final MetaStorageManager metaStorage;
+
+    /**
+     * Deployment configuration.
+     */
+    private final DeploymentConfiguration configuration;
+
+    /**
+     * Cluster management group manager.
+     */
+    private final ClusterManagementGroupManager cmgManager;
+
+    /**
+     * In flight futures tracker.
+     */
+    private final InFlightFutures inFlightFutures = new InFlightFutures();
+
+    /**
+     * Cluster service.
+     */
+    private final ClusterService clusterService;
+
+    /**
+     * Folder for units.
+     */
+    private Path unitsFolder;
+
+    /**
+     * Constructor.
+     *
+     * @param clusterService Cluster service.
+     * @param metaStorage Meta storage.
+     * @param workDir Node working directory.
+     * @param configuration Deployment configuration.
+     * @param cmgManager Cluster management group manager.
+     */
+    public DeploymentManagerImpl(ClusterService clusterService,
+            MetaStorageManager metaStorage,
+            Path workDir,
+            DeploymentConfiguration configuration,
+            ClusterManagementGroupManager cmgManager) {
+        this.clusterService = clusterService;
+        this.metaStorage = metaStorage;
+        this.configuration = configuration;
+        this.cmgManager = cmgManager;
+        unitsFolder = workDir;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> deploy(String id, Version version, 
DeploymentUnit deploymentUnit) {
+        Set<Operation> operations = new HashSet<>();
+
+        ByteArray key = new ByteArray(UNITS_PREFIX + id + ":" + 
version.render());
+
+        UnitMeta meta = new UnitMeta(id, version, deploymentUnit.unitName(), 
Collections.emptyList());
+
+        operations.add(put(key, UnitMetaSerializer.serialize(meta)));
+
+        DeployUnitRequestBuilder builder = DeployUnitRequestImpl.builder();
+
+        try {
+            builder.unitContent(deploymentUnit.content().readAllBytes());
+        } catch (IOException e) {
+            LOG.error("Error to read deployment unit content", e);
+            return CompletableFuture.failedFuture(new 
DeploymentUnitReadException(e));
+        }
+        DeployUnitRequest request = builder
+                .unitName(deploymentUnit.unitName())
+                .id(id)
+                .version(version.render())
+                .build();
+
+        return metaStorage.invoke(notExists(key),
+                        operations, Set.of(Operations.noop()))
+                .thenCompose(success -> {
+                    if (success) {
+                        return doDeploy(request);
+                    }
+                    LOG.error("Failed to deploy meta of unit " + id + ":" + 
version);
+                    return CompletableFuture.failedFuture(new 
DeployUnitWriteMetaException());
+                })
+                .thenApply(completed -> {
+                    if (completed) {
+                        startDeployAsyncToCmg(request);
+                    }
+                    return completed;
+                });
+    }
+
+    private void startDeployAsyncToCmg(DeployUnitRequest request) {
+        cmgManager.cmgNodes()
+                .thenApply(nodes -> nodes.stream().map(node -> 
clusterService.topologyService().getByConsistentId(node)).collect(
+                        Collectors.toList()))
+                .thenAccept(clusterNodes -> {
+                    for (ClusterNode clusterNode : clusterNodes) {
+                        inFlightFutures.registerFuture(
+                                clusterService.messagingService()
+                                        .invoke(clusterNode, request, 
Long.MAX_VALUE)
+                                        .thenCompose(message -> {
+                                            Throwable error = 
((DeployUnitResponse) message).error();
+                                            if (error != null) {
+                                                LOG.error("Failed to deploy 
unit " + request.id() + ":" + request.version()
+                                                        + " to node " + 
clusterNode, error);
+                                                return 
CompletableFuture.failedFuture(error);
+                                            }
+                                            return 
CompletableFuture.completedFuture(true);
+                                        })
+                        );
+                    }
+                });
+    }
+
+    @Override
+    public CompletableFuture<Void> undeploy(String id, Version version) {
+        ByteArray key = new ByteArray(UNITS_PREFIX + id + ":" + version);
+
+        return metaStorage.invoke(exists(key), Operations.remove(key), 
Operations.noop())
+                .thenCompose(success -> {
+                    if (success) {
+                        return cmgManager.logicalTopology();
+                    }
+                    throw new UndeployNotExistedDeploymentUnitException("Unit 
" + id + " with version "
+                            + version + " doesn't exist.");
+                }).thenApply(logicalTopologySnapshot -> {
+                    for (ClusterNode node : logicalTopologySnapshot.nodes()) {
+                        clusterService.messagingService()
+                                .invoke(node, UndeployUnitRequestImpl.builder()
+                                                .id(id)
+                                                .version(version.render())
+                                                .build(),
+                                        Long.MAX_VALUE);
+                    }
+                    return null;
+                });
+    }
+
+    @Override
+    public CompletableFuture<Set<UnitStatus>> list() {
+        CompletableFuture<Set<UnitStatus>> result = new CompletableFuture<>();
+        Map<String, UnitStatusBuilder> map = new HashMap<>();
+        metaStorage.prefix(new ByteArray(UNITS_PREFIX))
+                .subscribe(new Subscriber<>() {
+                    @Override
+                    public void onSubscribe(Subscription subscription) {
+                        subscription.request(Long.MAX_VALUE);
+                    }
+
+                    @Override
+                    public void onNext(Entry item) {
+                        UnitMeta meta = 
UnitMetaSerializer.deserialize(item.value());
+                        map.computeIfAbsent(meta.getId(), UnitStatus::builder)
+                                .append(meta.getVersion(), 
meta.getConsistentIdLocation());
+                    }
+
+                    @Override
+                    public void onError(Throwable throwable) {
+                        result.completeExceptionally(throwable);
+                    }
+
+                    @Override
+                    public void onComplete() {
+                        
result.complete(map.values().stream().map(UnitStatusBuilder::build).collect(Collectors.toSet()));
+                    }
+                });
+        return result;
+    }
+
+    @Override
+    public CompletableFuture<Set<Version>> versions(String unitId) {
+        CompletableFuture<Set<Version>> result = new CompletableFuture<>();
+        metaStorage.prefix(new ByteArray(UNITS_PREFIX + unitId))
+                .subscribe(new Subscriber<>() {
+                    private final Set<Version> set = new HashSet<>();
+
+                    @Override
+                    public void onSubscribe(Subscription subscription) {
+                        subscription.request(Long.MAX_VALUE);
+                    }
+
+                    @Override
+                    public void onNext(Entry item) {
+                        UnitMeta deserialize = 
UnitMetaSerializer.deserialize(item.value());
+                        set.add(deserialize.getVersion());
+                    }
+
+                    @Override
+                    public void onError(Throwable throwable) {
+                        result.completeExceptionally(throwable);
+                    }
+
+                    @Override
+                    public void onComplete() {
+                        result.complete(set);
+                    }
+                });
+        return result;
+    }
+
+    @Override
+    public CompletableFuture<UnitStatus> status(String id) {
+        CompletableFuture<UnitStatus> result = new CompletableFuture<>();
+        metaStorage.prefix(new ByteArray(UNITS_PREFIX + id))
+                .subscribe(new Subscriber<>() {
+                    private final UnitStatusBuilder builder = 
UnitStatus.builder(id);
+                    private final Set<Version> set = new HashSet<>();
+
+                    @Override
+                    public void onSubscribe(Subscription subscription) {
+                        subscription.request(Long.MAX_VALUE);
+                    }
+
+                    @Override
+                    public void onNext(Entry item) {
+                        UnitMeta deserialize = 
UnitMetaSerializer.deserialize(item.value());
+                        builder.append(deserialize.getVersion(), 
deserialize.getConsistentIdLocation());
+                    }
+
+                    @Override
+                    public void onError(Throwable throwable) {
+                        result.completeExceptionally(throwable);
+                    }
+
+                    @Override
+                    public void onComplete() {
+                        result.complete(builder.build());
+                    }
+                });
+        return result;
+    }
+
+    @Override
+    public void start() {
+        unitsFolder = 
unitsFolder.resolve(configuration.deploymentLocation().value());
+        
clusterService.messagingService().addMessageHandler(DeployUnitMessageTypes.class,
+                (message, senderConsistentId, correlationId) -> {
+
+                    if (message instanceof DeployUnitRequest) {
+                        processDeployRequest((DeployUnitRequest) message, 
senderConsistentId, correlationId);
+                    } else if (message instanceof UndeployUnitRequest) {
+                        processUndeployRequest((UndeployUnitRequest) message, 
senderConsistentId, correlationId);
+                    }
+                });
+    }
+
+    private void processDeployRequest(DeployUnitRequest executeRequest, String 
senderConsistentId, long correlationId) {
+        doDeploy(executeRequest).whenComplete((success, throwable) -> {
+            DeployUnitResponseBuilder builder = 
DeployUnitResponseImpl.builder();
+            if (throwable != null) {
+                builder.error(throwable);
+            }
+            clusterService.messagingService().respond(senderConsistentId,
+                    builder.build(), correlationId);
+        });
+    }
+
+    private void processUndeployRequest(UndeployUnitRequest executeRequest, 
String senderConsistentId, long correlationId) {
+        try {
+            Path unitPath = unitsFolder
+                    .resolve(executeRequest.id())
+                    .resolve(executeRequest.version());
+
+            Files.walkFileTree(unitPath, new SimpleFileVisitor<>() {
+                @Override
+                public FileVisitResult visitFile(Path file, 
BasicFileAttributes attrs) throws IOException {
+                    Files.delete(file);
+                    return FileVisitResult.CONTINUE;
+                }
+
+                @Override
+                public FileVisitResult postVisitDirectory(Path dir, 
IOException exc) throws IOException {
+                    Files.delete(dir);
+                    return FileVisitResult.CONTINUE;
+                }
+            });

Review Comment:
   ```suggestion
               IgniteUtils.deleteIfExists(unitPath);
   ```



##########
modules/api/src/main/java/org/apache/ignite/deployment/version/Version.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.deployment.version;
+
+
+/**
+ * Unit version interface. Version implementations should be comparable.
+ */
+public interface Version extends Comparable<Version> {
+

Review Comment:
   ```suggestion
   ```



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/deployment/ItDeploymentUnitTest.java:
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployment;
+
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.OpenOption;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.deployment.DeploymentUnit;
+import org.apache.ignite.deployment.UnitStatus;
+import org.apache.ignite.deployment.UnitStatus.UnitStatusBuilder;
+import org.apache.ignite.deployment.version.Version;
+import org.apache.ignite.internal.AbstractClusterIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import 
org.apache.ignite.internal.deployunit.configuration.DeploymentConfiguration;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration tests for {@link org.apache.ignite.deployment.IgniteDeployment}.
+ */
+public class ItDeploymentUnitTest extends AbstractClusterIntegrationTest {
+
+    private static final long REPLICA_TIMEOUT = 10;
+    private static final long SIZE_IN_BYTES = 1024L;
+
+    private static Path dummyFile;
+
+    @BeforeAll
+    public static void generateDummy() throws IOException {
+        OpenOption[] options = {
+                StandardOpenOption.WRITE,
+                StandardOpenOption.CREATE
+        };
+
+        dummyFile = workDir.resolve("dummy.txt");
+
+        try (SeekableByteChannel channel = Files.newByteChannel(dummyFile, 
options)) {
+            channel.position(SIZE_IN_BYTES);
+
+            ByteBuffer buf = ByteBuffer.allocate(4).putInt(2);
+            buf.rewind();
+            channel.write(buf);
+        }
+    }
+
+    @Test
+    public void testDeploy() throws Exception {
+        Unit unit = deployAndVerify("test", Version.parseVersion("1.1.0"), 1);
+
+
+        IgniteImpl cmg = cluster.node(0);
+        waitUnitReplica(cmg, unit);
+
+        CompletableFuture<Set<UnitStatus>> list = node(2).deployment().list();
+        UnitStatusBuilder builder = 
UnitStatus.builder(unit.id).append(unit.version, 
List.of(unit.deployedNode.name(), cmg.name()));
+        assertThat(list, willBe(Set.of(builder.build())));
+    }
+
+    @Test
+    public void testDeployUndeploy() throws Exception {
+        Unit unit = deployAndVerify("test", Version.parseVersion("1.1.0"), 1);
+        unit.undeploy();
+
+        CompletableFuture<Set<UnitStatus>> list = node(2).deployment().list();
+        assertThat(list, willBe(Collections.emptySet()));
+    }
+
+    @Test
+    public void testDeployTwoUnits() throws Exception {
+        String id = "test";
+        Unit unit1 = deployAndVerify(id, Version.parseVersion("1.1.0"), 1);
+        Unit unit2 = deployAndVerify(id, Version.parseVersion("1.1.1"), 2);
+
+        IgniteImpl cmg = cluster.node(0);
+        waitUnitReplica(cmg, unit1);
+        waitUnitReplica(cmg, unit2);
+
+        CompletableFuture<UnitStatus> list = node(2).deployment().status(id);
+        UnitStatusBuilder status = UnitStatus.builder(id)
+                .append(unit1.version, List.of(unit1.deployedNode.name(), 
cmg.name()))
+                .append(unit2.version, List.of(unit2.deployedNode.name(), 
cmg.name()));
+        assertThat(list, willBe(status.build()));
+
+        CompletableFuture<Set<Version>> versions = 
node(2).deployment().versions(unit1.id);
+        assertThat(versions, willBe(Set.of(unit1.version, unit2.version)));
+    }
+
+
+    @Test
+    public void testDeployTwoUnitsAndUndeployOne() throws Exception {
+        Unit unit1 = deployAndVerify("test", Version.parseVersion("1.1.0"), 1);
+        Unit unit2 = deployAndVerify("test", Version.parseVersion("1.1.1"), 2);
+
+        IgniteImpl cmg = cluster.node(0);
+        waitUnitReplica(cmg, unit1);
+        waitUnitReplica(cmg, unit2);
+
+        CompletableFuture<UnitStatus> list = 
node(2).deployment().status(unit2.id);
+        UnitStatusBuilder builder = UnitStatus.builder(unit1.id)
+                .append(unit1.version, List.of(unit1.deployedNode.name(), 
cmg.name()))
+                .append(unit2.version, List.of(unit2.deployedNode.name(), 
cmg.name()));
+        assertThat(list, willBe(builder.build()));
+
+        unit2.undeploy();
+        CompletableFuture<Set<Version>> newVersions = 
node(2).deployment().versions(unit1.id);
+        assertThat(newVersions, willBe(Set.of(unit1.version)));
+    }
+
+
+    private Unit deployAndVerify(String id, Version version, int nodeIndex) 
throws Exception {
+        IgniteImpl entryNode = node(nodeIndex);
+
+        CompletableFuture<Boolean> deploy = entryNode.deployment()
+                .deploy(id, version, DeploymentUnit.fromPath(dummyFile));
+
+        assertThat(deploy.get(), is(true));
+
+        Unit unit = new Unit(entryNode, id, version);
+
+        assertThat(getNodeUnitFile(unit).toFile().exists(), is(true));
+
+        return unit;
+    }
+
+
+    private static Path getNodeUnitFile(Unit unit) {
+        return getNodeUnitFile(unit.deployedNode, unit.id, unit.version);
+    }
+
+    private static Path getNodeUnitFile(IgniteImpl node, String unitId, 
Version unitVersion) {
+        String deploymentFolder = node.nodeConfiguration()
+                .getConfiguration(DeploymentConfiguration.KEY)
+                .deploymentLocation().value();
+        Path resolve = workDir.resolve(node.name()).resolve(deploymentFolder);
+        return resolve.resolve(unitId)
+                .resolve(unitVersion.render())
+                .resolve(dummyFile.getFileName());
+    }
+
+
+    private static void waitUnitReplica(IgniteImpl ignite, Unit unit) throws 
InterruptedException {
+        Path unitPath = getNodeUnitFile(ignite, unit.id, unit.version);
+        IgniteTestUtils.waitForCondition(() -> 
unitPath.toFile().getTotalSpace() == SIZE_IN_BYTES, REPLICA_TIMEOUT);
+    }
+
+    private static void waitUnitClean(IgniteImpl ignite, Unit unit) throws 
InterruptedException {
+        Path unitPath = getNodeUnitFile(ignite, unit.id, unit.version);
+        IgniteTestUtils.waitForCondition(() -> !unitPath.toFile().exists(), 
REPLICA_TIMEOUT);
+    }
+
+    static class Unit {
+        private final IgniteImpl deployedNode;
+        private final String id;
+        private final Version version;
+

Review Comment:
   ```suggestion
   ```



##########
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java:
##########
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.exists;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.stream.Collectors;
+import org.apache.ignite.deployment.DeploymentUnit;
+import org.apache.ignite.deployment.IgniteDeployment;
+import org.apache.ignite.deployment.UnitStatus;
+import org.apache.ignite.deployment.UnitStatus.UnitStatusBuilder;
+import org.apache.ignite.deployment.version.Version;
+import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
+import 
org.apache.ignite.internal.deployunit.configuration.DeploymentConfiguration;
+import 
org.apache.ignite.internal.deployunit.exception.DeployUnitWriteMetaException;
+import 
org.apache.ignite.internal.deployunit.exception.DeploymentUnitReadException;
+import 
org.apache.ignite.internal.deployunit.exception.UndeployNotExistedDeploymentUnitException;
+import org.apache.ignite.internal.deployunit.message.DeployUnitMessageTypes;
+import org.apache.ignite.internal.deployunit.message.DeployUnitRequest;
+import org.apache.ignite.internal.deployunit.message.DeployUnitRequestBuilder;
+import org.apache.ignite.internal.deployunit.message.DeployUnitRequestImpl;
+import org.apache.ignite.internal.deployunit.message.DeployUnitResponse;
+import org.apache.ignite.internal.deployunit.message.DeployUnitResponseBuilder;
+import org.apache.ignite.internal.deployunit.message.DeployUnitResponseImpl;
+import org.apache.ignite.internal.deployunit.message.UndeployUnitRequest;
+import org.apache.ignite.internal.deployunit.message.UndeployUnitRequestImpl;
+import org.apache.ignite.internal.deployunit.message.UndeployUnitResponseImpl;
+import org.apache.ignite.internal.future.InFlightFutures;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.metastorage.dsl.Operations;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+
+/**
+ * Deployment manager implementation.
+ */
+public class DeploymentManagerImpl implements IgniteDeployment, 
IgniteComponent {
+
+    private static final IgniteLogger LOG = 
Loggers.forClass(DeploymentManagerImpl.class);
+
+    private static final String DEPLOY_UNIT_PREFIX = "deploy-unit.";
+
+    private static final String UNITS_PREFIX = DEPLOY_UNIT_PREFIX + "units.";
+
+    /**
+     * Meta storage.
+     */
+    private final MetaStorageManager metaStorage;
+
+    /**
+     * Deployment configuration.
+     */
+    private final DeploymentConfiguration configuration;
+
+    /**
+     * Cluster management group manager.
+     */
+    private final ClusterManagementGroupManager cmgManager;
+
+    /**
+     * In flight futures tracker.
+     */
+    private final InFlightFutures inFlightFutures = new InFlightFutures();
+
+    /**
+     * Cluster service.
+     */
+    private final ClusterService clusterService;
+
+    /**
+     * Folder for units.
+     */
+    private Path unitsFolder;
+
+    /**
+     * Constructor.
+     *
+     * @param clusterService Cluster service.
+     * @param metaStorage Meta storage.
+     * @param workDir Node working directory.
+     * @param configuration Deployment configuration.
+     * @param cmgManager Cluster management group manager.
+     */
+    public DeploymentManagerImpl(ClusterService clusterService,
+            MetaStorageManager metaStorage,
+            Path workDir,
+            DeploymentConfiguration configuration,
+            ClusterManagementGroupManager cmgManager) {
+        this.clusterService = clusterService;
+        this.metaStorage = metaStorage;
+        this.configuration = configuration;
+        this.cmgManager = cmgManager;
+        unitsFolder = workDir;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> deploy(String id, Version version, 
DeploymentUnit deploymentUnit) {
+        Set<Operation> operations = new HashSet<>();
+
+        ByteArray key = new ByteArray(UNITS_PREFIX + id + ":" + 
version.render());
+
+        UnitMeta meta = new UnitMeta(id, version, deploymentUnit.unitName(), 
Collections.emptyList());
+
+        operations.add(put(key, UnitMetaSerializer.serialize(meta)));
+
+        DeployUnitRequestBuilder builder = DeployUnitRequestImpl.builder();
+
+        try {
+            builder.unitContent(deploymentUnit.content().readAllBytes());
+        } catch (IOException e) {
+            LOG.error("Error to read deployment unit content", e);
+            return CompletableFuture.failedFuture(new 
DeploymentUnitReadException(e));
+        }
+        DeployUnitRequest request = builder
+                .unitName(deploymentUnit.unitName())
+                .id(id)
+                .version(version.render())
+                .build();
+
+        return metaStorage.invoke(notExists(key),
+                        operations, Set.of(Operations.noop()))
+                .thenCompose(success -> {
+                    if (success) {
+                        return doDeploy(request);
+                    }
+                    LOG.error("Failed to deploy meta of unit " + id + ":" + 
version);
+                    return CompletableFuture.failedFuture(new 
DeployUnitWriteMetaException());
+                })
+                .thenApply(completed -> {
+                    if (completed) {
+                        startDeployAsyncToCmg(request);
+                    }
+                    return completed;
+                });
+    }
+
+    private void startDeployAsyncToCmg(DeployUnitRequest request) {
+        cmgManager.cmgNodes()
+                .thenApply(nodes -> nodes.stream().map(node -> 
clusterService.topologyService().getByConsistentId(node)).collect(
+                        Collectors.toList()))
+                .thenAccept(clusterNodes -> {
+                    for (ClusterNode clusterNode : clusterNodes) {
+                        inFlightFutures.registerFuture(
+                                clusterService.messagingService()
+                                        .invoke(clusterNode, request, 
Long.MAX_VALUE)
+                                        .thenCompose(message -> {
+                                            Throwable error = 
((DeployUnitResponse) message).error();
+                                            if (error != null) {
+                                                LOG.error("Failed to deploy 
unit " + request.id() + ":" + request.version()
+                                                        + " to node " + 
clusterNode, error);
+                                                return 
CompletableFuture.failedFuture(error);
+                                            }
+                                            return 
CompletableFuture.completedFuture(true);
+                                        })
+                        );
+                    }
+                });
+    }
+
+    @Override
+    public CompletableFuture<Void> undeploy(String id, Version version) {
+        ByteArray key = new ByteArray(UNITS_PREFIX + id + ":" + version);
+
+        return metaStorage.invoke(exists(key), Operations.remove(key), 
Operations.noop())
+                .thenCompose(success -> {
+                    if (success) {
+                        return cmgManager.logicalTopology();
+                    }
+                    throw new UndeployNotExistedDeploymentUnitException("Unit 
" + id + " with version "
+                            + version + " doesn't exist.");
+                }).thenApply(logicalTopologySnapshot -> {
+                    for (ClusterNode node : logicalTopologySnapshot.nodes()) {
+                        clusterService.messagingService()
+                                .invoke(node, UndeployUnitRequestImpl.builder()
+                                                .id(id)
+                                                .version(version.render())
+                                                .build(),
+                                        Long.MAX_VALUE);
+                    }
+                    return null;
+                });
+    }
+
+    @Override
+    public CompletableFuture<Set<UnitStatus>> list() {
+        CompletableFuture<Set<UnitStatus>> result = new CompletableFuture<>();
+        Map<String, UnitStatusBuilder> map = new HashMap<>();
+        metaStorage.prefix(new ByteArray(UNITS_PREFIX))
+                .subscribe(new Subscriber<>() {
+                    @Override
+                    public void onSubscribe(Subscription subscription) {
+                        subscription.request(Long.MAX_VALUE);
+                    }
+
+                    @Override
+                    public void onNext(Entry item) {
+                        UnitMeta meta = 
UnitMetaSerializer.deserialize(item.value());
+                        map.computeIfAbsent(meta.getId(), UnitStatus::builder)
+                                .append(meta.getVersion(), 
meta.getConsistentIdLocation());
+                    }
+
+                    @Override
+                    public void onError(Throwable throwable) {
+                        result.completeExceptionally(throwable);
+                    }
+
+                    @Override
+                    public void onComplete() {
+                        
result.complete(map.values().stream().map(UnitStatusBuilder::build).collect(Collectors.toSet()));
+                    }
+                });
+        return result;
+    }
+
+    @Override
+    public CompletableFuture<Set<Version>> versions(String unitId) {
+        CompletableFuture<Set<Version>> result = new CompletableFuture<>();
+        metaStorage.prefix(new ByteArray(UNITS_PREFIX + unitId))
+                .subscribe(new Subscriber<>() {
+                    private final Set<Version> set = new HashSet<>();
+
+                    @Override
+                    public void onSubscribe(Subscription subscription) {
+                        subscription.request(Long.MAX_VALUE);
+                    }
+
+                    @Override
+                    public void onNext(Entry item) {
+                        UnitMeta deserialize = 
UnitMetaSerializer.deserialize(item.value());
+                        set.add(deserialize.getVersion());
+                    }
+
+                    @Override
+                    public void onError(Throwable throwable) {
+                        result.completeExceptionally(throwable);
+                    }
+
+                    @Override
+                    public void onComplete() {
+                        result.complete(set);
+                    }
+                });
+        return result;
+    }
+
+    @Override
+    public CompletableFuture<UnitStatus> status(String id) {
+        CompletableFuture<UnitStatus> result = new CompletableFuture<>();
+        metaStorage.prefix(new ByteArray(UNITS_PREFIX + id))
+                .subscribe(new Subscriber<>() {
+                    private final UnitStatusBuilder builder = 
UnitStatus.builder(id);
+                    private final Set<Version> set = new HashSet<>();
+
+                    @Override
+                    public void onSubscribe(Subscription subscription) {
+                        subscription.request(Long.MAX_VALUE);
+                    }
+
+                    @Override
+                    public void onNext(Entry item) {
+                        UnitMeta deserialize = 
UnitMetaSerializer.deserialize(item.value());
+                        builder.append(deserialize.getVersion(), 
deserialize.getConsistentIdLocation());
+                    }
+
+                    @Override
+                    public void onError(Throwable throwable) {
+                        result.completeExceptionally(throwable);
+                    }
+
+                    @Override
+                    public void onComplete() {
+                        result.complete(builder.build());
+                    }
+                });
+        return result;
+    }
+
+    @Override
+    public void start() {
+        unitsFolder = 
unitsFolder.resolve(configuration.deploymentLocation().value());
+        
clusterService.messagingService().addMessageHandler(DeployUnitMessageTypes.class,
+                (message, senderConsistentId, correlationId) -> {
+
+                    if (message instanceof DeployUnitRequest) {
+                        processDeployRequest((DeployUnitRequest) message, 
senderConsistentId, correlationId);
+                    } else if (message instanceof UndeployUnitRequest) {
+                        processUndeployRequest((UndeployUnitRequest) message, 
senderConsistentId, correlationId);
+                    }
+                });
+    }
+
+    private void processDeployRequest(DeployUnitRequest executeRequest, String 
senderConsistentId, long correlationId) {
+        doDeploy(executeRequest).whenComplete((success, throwable) -> {
+            DeployUnitResponseBuilder builder = 
DeployUnitResponseImpl.builder();
+            if (throwable != null) {
+                builder.error(throwable);
+            }
+            clusterService.messagingService().respond(senderConsistentId,
+                    builder.build(), correlationId);
+        });
+    }
+
+    private void processUndeployRequest(UndeployUnitRequest executeRequest, 
String senderConsistentId, long correlationId) {
+        try {
+            Path unitPath = unitsFolder
+                    .resolve(executeRequest.id())
+                    .resolve(executeRequest.version());
+
+            Files.walkFileTree(unitPath, new SimpleFileVisitor<>() {
+                @Override
+                public FileVisitResult visitFile(Path file, 
BasicFileAttributes attrs) throws IOException {
+                    Files.delete(file);
+                    return FileVisitResult.CONTINUE;
+                }
+
+                @Override
+                public FileVisitResult postVisitDirectory(Path dir, 
IOException exc) throws IOException {
+                    Files.delete(dir);
+                    return FileVisitResult.CONTINUE;
+                }
+            });
+        } catch (IOException e) {
+            LOG.error("Failed to undeploy unit " + executeRequest.id() + ":" + 
executeRequest.version(), e);
+            clusterService.messagingService()
+                    .respond(senderConsistentId, 
UndeployUnitResponseImpl.builder().error(e).build(), correlationId);
+            return;
+        }
+
+        clusterService.messagingService()
+                .respond(senderConsistentId, 
UndeployUnitResponseImpl.builder().build(), correlationId);
+    }
+
+    private CompletableFuture<Boolean> doDeploy(DeployUnitRequest 
executeRequest) {
+        String id = executeRequest.id();
+        String version = executeRequest.version();
+        try {
+            Path unitPath = unitsFolder
+                    .resolve(executeRequest.id())
+                    .resolve(executeRequest.version())
+                    .resolve(executeRequest.unitName());
+            Path unitPathTmp = unitPath.resolveSibling(unitPath.getFileName() 
+ ".tmp");
+            Files.createDirectories(unitPathTmp.getParent());
+            Files.write(unitPathTmp, executeRequest.unitContent(),
+                    StandardOpenOption.CREATE, StandardOpenOption.SYNC, 
StandardOpenOption.TRUNCATE_EXISTING);

Review Comment:
   DSYNC should be enough



##########
modules/api/src/main/java/org/apache/ignite/deployment/version/Version.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.deployment.version;
+
+
+/**
+ * Unit version interface. Version implementations should be comparable.
+ */
+public interface Version extends Comparable<Version> {
+
+    /**
+     * Render version representation in String format.
+     *
+     * @return version string representation.
+     */
+    String render();
+
+    static Version latest() {

Review Comment:
   Maybe just use `LATEST`?



##########
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitMeta.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import org.apache.ignite.deployment.version.Version;
+
+/**
+ * Unit meta data class.
+ */
+public class UnitMeta {
+

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to