tkalkirill commented on code in PR #7156:
URL: https://github.com/apache/ignite-3/pull/7156#discussion_r2609785008


##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java:
##########
@@ -307,10 +310,13 @@ interface DisasterRecoveryMessages {
         /** Message type for {@link LocalTablePartitionStateResponse}. */
         short LOCAL_TABLE_PARTITION_STATE_RESPONSE = 105;
 
-        /** Message type for disaster recovery request forwarding. */
+        /** Message type for {@link DisasterRecoveryRequestMessage}. */
         short DISASTER_RECOVERY_REQUEST = 106;
 
-        /** Message type for disaster recovery request forwarding response. */
+        /** Message type for {@link DisasterRecoveryResponseMessage}. */
         short DISASTER_RECOVERY_RESPONSE = 111;
+
+        /** Message type for {@link OperationCompletedMessage}. */
+        short DISASTER_RECOVERY_OPERATION_COMPLETED = 112;

Review Comment:
   Let's rename to `OPERATION_COMPLETED`.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/RemoteOperationException.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.disaster.exceptions;
+
+import org.apache.ignite.lang.ErrorGroups.DisasterRecovery;
+
+/** Exception is thrown when remote node encounters an error during disaster 
recovery processing. */

Review Comment:
   Let's change to something like: "Exception is thrown when remote node 
encounters an error while executing a disaster recovery operation."



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/OperationCompletedMessage.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.network.disaster;
+
+import java.util.UUID;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
+import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.DisasterRecoveryMessages;
+import org.jetbrains.annotations.Nullable;
+
+/** Notifies that disaster recovery operation was completed. */

Review Comment:
   There is not enough documentation, please add a few words about the purpose 
of the message and who should send and receive it.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryRequest.java:
##########
@@ -38,6 +39,12 @@ interface DisasterRecoveryRequest {
     /** Returns request type. */
     DisasterRecoveryRequestType type();
 
+    /**
+     * For multi node requests returns names of nodes involved in the recovery 
or empty list if all nodes should be used. For single node

Review Comment:
   The documentation has me confused. 
   First, you return a set, but you mention a list. 
   Second, why do we need this method for a single-node query if an empty list 
means all nodes?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/exceptions/RemoteOperationException.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.disaster.exceptions;
+
+import org.apache.ignite.lang.ErrorGroups.DisasterRecovery;
+
+/** Exception is thrown when remote node encounters an error during disaster 
recovery processing. */
+public class RemoteOperationException extends DisasterRecoveryException {
+    private static final long serialVersionUID = 1L;
+
+    /** Constructor. */
+    public RemoteOperationException(String message, String nodeName) {
+        super(

Review Comment:
   Technically, you could have made the call straight to 
`org.apache.ignite.internal.lang.IgniteInternalException#IgniteInternalException(int,
 java.lang.String, java.lang.Object...)`; it would have looked more elegant to 
me.
   I'm not insisting.



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/OperationCompletedMessage.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.network.disaster;
+
+import java.util.UUID;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
+import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.DisasterRecoveryMessages;
+import org.jetbrains.annotations.Nullable;
+
+/** Notifies that disaster recovery operation was completed. */
+@Transferable(DisasterRecoveryMessages.DISASTER_RECOVERY_OPERATION_COMPLETED)
+public interface OperationCompletedMessage extends NetworkMessage {
+    /** ID of the completed operation. */
+    UUID operationId();
+
+    @Nullable String exceptionMessage();

Review Comment:
   Add documentation and what does null mean.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -1132,6 +1161,17 @@ private CompletableFuture<Void> 
processNewRequest(DisasterRecoveryRequest reques
 
         CompletableFuture<Void> operationFuture = new 
CompletableFuture<Void>().orTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS);
 
+        CompletableFuture<Void> remoteProcessingFuture = request.type() == 
DisasterRecoveryRequestType.SINGLE_NODE

Review Comment:
   This is hard to read; it needs to be simplified.
   Let's make it a separate method and simplify it a bit.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/MultiNodeOperations.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.disaster;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import 
org.apache.ignite.internal.table.distributed.disaster.exceptions.RemoteOperationException;
+
+/** Contains operations that should be processed by remote node. */
+class MultiNodeOperations {
+    private final Map<UUID, CompletableFuture<Void>> operationsById = new 
ConcurrentHashMap<>();
+
+    /** Adds new operation to track. */
+    void add(UUID operationId, CompletableFuture<Void> operationFuture) {
+        operationsById.put(operationId, operationFuture);
+    }
+
+    /**
+     * Removes operation tracking.
+     *
+     * @return Removed operation future.
+     */
+    CompletableFuture<Void> remove(UUID operationId) {
+        return operationsById.remove(operationId);
+    }
+
+    /** Completes all tracked operations with a given exception. */
+    void exceptionally(String nodeName, Throwable e) {

Review Comment:
   Maybe rename to `completeExceptionallyAll` ?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -1132,6 +1161,17 @@ private CompletableFuture<Void> 
processNewRequest(DisasterRecoveryRequest reques
 
         CompletableFuture<Void> operationFuture = new 
CompletableFuture<Void>().orTimeout(TIMEOUT_SECONDS, TimeUnit.SECONDS);
 
+        CompletableFuture<Void> remoteProcessingFuture = request.type() == 
DisasterRecoveryRequestType.SINGLE_NODE
+                ? nullCompletedFuture()
+                : allOf(getNodeNames(request.nodeNames())
+                        .stream()
+                        .map(nodeName -> addMultiNodeOperation(nodeName, 
operationId))
+                        .toArray(CompletableFuture[]::new))
+                        .whenComplete((ignored, e) -> {
+                            operationsByNodeName.values().forEach(operations 
-> operations.remove(operationId));

Review Comment:
   U can use nodeNames and use compute in Map.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -1272,6 +1340,16 @@ private void handleTriggerKeyUpdate(WatchEvent 
watchEvent) {
                                 copyStateTo(operationFuture).accept(res, ex);
                             }
 
+                            for (String node : 
getNodeNames(request.nodeNames())) {

Review Comment:
   I don't quite understand, we should send the message only to the initiator, 
why to everyone?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -1446,6 +1526,24 @@ private void handleLocalPartitionStatesRequest(
         }, threadPool);
     }
 
+    private void handleOperationCompletedMessage(
+            OperationCompletedMessage message,
+            InternalClusterNode sender
+    ) {
+        MultiNodeOperations multiNodeOperations = 
operationsByNodeName.get(sender.name());
+        if (multiNodeOperations != null) {
+            CompletableFuture<Void> operationFuture = 
multiNodeOperations.remove(message.operationId());

Review Comment:
   I would suggest doing this logic inside `MultiNodeOperations` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to