PakhomovAlexander commented on code in PR #1809:
URL: https://github.com/apache/ignite-3/pull/1809#discussion_r1141849235


##########
modules/api/src/main/java/org/apache/ignite/deployment/DeploymentInfo.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.deployment;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Data class with deployment unit information.
+ */
+public class DeploymentInfo {
+    private final DeploymentStatus status;
+
+    private final List<String> consistentIds;
+
+    public DeploymentInfo(DeploymentStatus status, List<String> consistentIds) 
{

Review Comment:
   ```suggestion
       private DeploymentInfo(DeploymentStatus status, List<String> 
consistentIds) {
   ```



##########
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeployTracker.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import static org.apache.ignite.internal.deployunit.key.UnitKey.key;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.deployment.version.Version;
+import org.apache.ignite.internal.future.InFlightFutures;
+import org.apache.ignite.lang.ByteArray;
+
+/**
+ * Deploy actions tracker.
+ */
+public class DeployTracker {
+    /**
+     * In flight futures tracker.
+     */
+    private final Map<ByteArray, InFlightFutures> inFlightFutures = new 
ConcurrentHashMap<>();
+
+    /**
+     * Track deploy action.
+     *
+     * @param <T> Future result type.
+     * @param id Deployment unit identifier.
+     * @param version Deployment unit version.
+     * @param trackableAction Deploy action.
+     * @return {@param trackableAction}.
+     */
+    public <T> CompletableFuture<T> track(String id, Version version, 
CompletableFuture<T> trackableAction) {
+        return inFlightFutures.computeIfAbsent(key(id, version), k -> new 
InFlightFutures()).registerFuture(trackableAction);

Review Comment:
   ```suggestion
           return inFlightFutures.putIfAbsent(key(id, version), new 
InFlightFutures()).registerFuture(trackableAction);
   ```



##########
modules/api/src/main/java/org/apache/ignite/deployment/DeploymentInfo.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.deployment;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Data class with deployment unit information.
+ */
+public class DeploymentInfo {
+    private final DeploymentStatus status;
+
+    private final List<String> consistentIds;
+
+    public DeploymentInfo(DeploymentStatus status, List<String> consistentIds) 
{
+        this.status = status;
+        this.consistentIds = Collections.unmodifiableList(consistentIds);
+    }
+
+    public DeploymentStatus status() {
+        return status;
+    }
+
+    public List<String> consistentIds() {
+        return consistentIds;
+    }
+
+    public static DeploymentInfoBuilder builder() {
+        return new DeploymentInfoBuilder();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        DeploymentInfo that = (DeploymentInfo) o;
+
+        return status == that.status
+                && (consistentIds != null ? 
consistentIds.equals(that.consistentIds) : that.consistentIds == null);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = status != null ? status.hashCode() : 0;
+        result = 31 * result + (consistentIds != null ? 
consistentIds.hashCode() : 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "DeploymentInfo{"
+                + "status=" + status
+                + ", consistentIds=[" + String.join(", ", consistentIds) + "]"
+                + '}';
+    }
+
+    /**
+     * Builder for {@link DeploymentInfo}.
+     */
+    public static final class DeploymentInfoBuilder {
+        private DeploymentStatus status;
+        private final List<String> consistentIds = new ArrayList<>();
+
+        public DeploymentInfoBuilder status(DeploymentStatus status) {
+            this.status = status;
+            return this;
+        }
+
+        public DeploymentInfoBuilder addConsistentId(String consistentId) {
+            consistentIds.add(consistentId);
+            return this;
+        }
+
+        public DeploymentInfo build() {
+            return new DeploymentInfo(status, consistentIds);

Review Comment:
   Could you add a null check for status here?



##########
modules/api/src/main/java/org/apache/ignite/deployment/UnitStatus.java:
##########
@@ -123,7 +129,7 @@ public String toString() {
     public static class UnitStatusBuilder {
 
         private final String id;
-        private final Map<Version, List<String>> versionToConsistentIds = new 
HashMap<>();
+        private final Map<Version, DeploymentInfo> versionToInfoBuilders = new 
HashMap<>();

Review Comment:
   Why does it have "builders" in the name?



##########
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/FileDeployerService.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.deployunit;
+
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.SYNC;
+import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteUtils;
+
+/**
+ * Service for file deploying on local File System.
+ */
+public class FileDeployerService {
+    private static final IgniteLogger LOG = 
Loggers.forClass(FileDeployerService.class);
+
+    private static final String TMP_SUFFIX = ".tmp";
+
+    /**
+     * Folder for units.
+     */
+    private Path unitsFolder;
+
+    private final ExecutorService executor = Executors.newFixedThreadPool(4, 
new NamedThreadFactory("deployment", LOG));

Review Comment:
   The pool size might be moved to the static field. Also, why do we use a 
fixed thread pool? Code deployment might not be used at all but we still have 4 
threads without any job.



##########
modules/api/src/main/java/org/apache/ignite/deployment/DeploymentStatus.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.deployment;
+
+/**
+ * Status of deployment process.
+ */
+public enum DeploymentStatus {
+    /** Unit deployment is in progress. */
+    UPLOADING,
+
+    /** Unit is deployed on the cluster. */
+    DEPLOYED,
+
+    /** Remove command was initiated for the unit and it will be removed soon. 
*/
+

Review Comment:
   ```suggestion
   ```



##########
modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/DeploymentManagerImpl.java:
##########
@@ -140,107 +116,76 @@ public CompletableFuture<Boolean> deployAsync(String id, 
Version version, Deploy
         Objects.requireNonNull(version);
         Objects.requireNonNull(deploymentUnit);
 
-        ByteArray key = key(id, version.render());
-        UnitMeta meta = new UnitMeta(id, version, deploymentUnit.name(), 
Collections.emptyList());
-
-        Operation put = put(key, UnitMetaSerializer.serialize(meta));
-
-        DeployUnitRequestBuilder builder = DeployUnitRequestImpl.builder();
+        UnitMeta meta = new UnitMeta(id, version, deploymentUnit.name(), 
DeploymentStatus.UPLOADING, Collections.emptyList());
 
+        byte[] unitContent;
         try {
-            builder.unitContent(deploymentUnit.content().readAllBytes());
+            unitContent = deploymentUnit.content().readAllBytes();
         } catch (IOException e) {
             LOG.error("Error to read deployment unit content", e);
             return CompletableFuture.failedFuture(new 
DeploymentUnitReadException(e));
         }
-        DeployUnitRequest request = builder
-                .unitName(deploymentUnit.name())
-                .id(id)
-                .version(version.render())
-                .build();
 
-        return metaStorage.invoke(notExists(key), put, Operations.noop())
+        CompletableFuture<Boolean> result = metastore.putIfNotExist(id, 
version, meta)
                 .thenCompose(success -> {
                     if (success) {
-                        return doDeploy(request);
+                        return deployer.deploy(id, version.render(), 
deploymentUnit.name(), unitContent);
                     }
                     LOG.error("Failed to deploy meta of unit " + id + ":" + 
version);
                     return CompletableFuture.failedFuture(
                             new DeploymentUnitAlreadyExistsException(id,
                                     "Unit " + id + ":" + version + " already 
exists"));
                 })
+                .thenCompose(deployed -> {
+                    if (deployed) {
+                        return metastore.updateMeta(id, version,
+                                meta1 -> 
meta1.addConsistentId(clusterService.topologyService().localMember().name()));
+                    }
+                    return CompletableFuture.completedFuture(false);
+                })
                 .thenApply(completed -> {
                     if (completed) {
-                        startDeployAsyncToCmg(request);
+                        messaging.startDeployAsyncToCmg(id, version, 
deploymentUnit.name(), unitContent)
+                                .thenAccept(v -> metastore.updateMeta(id, 
version, meta1 -> meta1.updateStatus(DEPLOYED)));
                     }
                     return completed;
                 });
-    }
-
-    private void startDeployAsyncToCmg(DeployUnitRequest request) {
-        cmgManager.cmgNodes()
-                .thenAccept(nodes -> {
-                    for (String node : nodes) {
-                        ClusterNode clusterNode = 
clusterService.topologyService().getByConsistentId(node);
-                        if (clusterNode != null) {
-                            
inFlightFutures.registerFuture(requestDeploy(clusterNode, request));
-                        }
-                    }
-                });
-    }
-
-    private CompletableFuture<Boolean> requestDeploy(ClusterNode clusterNode, 
DeployUnitRequest request) {
-        return clusterService.messagingService()
-                .invoke(clusterNode, request, Long.MAX_VALUE)
-                .thenCompose(message -> {
-                    Throwable error = ((DeployUnitResponse) message).error();
-                    if (error != null) {
-                        LOG.error("Failed to deploy unit " + request.id() + 
":" + request.version()
-                                + " to node " + clusterNode, error);
-                        return CompletableFuture.failedFuture(error);
-                    }
-                    return CompletableFuture.completedFuture(true);
-                });
+        return tracker.track(id, version, result);
     }
 
     @Override
     public CompletableFuture<Void> undeployAsync(String id, Version version) {
         checkId(id);
         Objects.requireNonNull(version);
 
-        ByteArray key = key(id, version.render());
-
-        return metaStorage.invoke(exists(key), Operations.remove(key), 
Operations.noop())
+        return messaging.stopInProgressDeploy(id, version)
+                .thenCompose(v -> metastore.updateMeta(id, version, meta -> 
meta.updateStatus(OBSOLETE)))
+                .thenCompose(success -> {
+                    if (success) {
+                        //TODO: Check unit usages here. If unit used in 
compute task we cannot just remove it.

Review Comment:
   Would you link the issue here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to