alievmirza commented on code in PR #7008:
URL: https://github.com/apache/ignite-3/pull/7008#discussion_r2545265423


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -1250,6 +1273,40 @@ private void putRecoveryTriggerIfRevisionIsNotProcessed(
                 });
     }
 
+    /**
+     * Forwards a disaster recovery request to a specific node.
+     *
+     * @param request The disaster recovery request to forward.
+     * @param revision Revision of event, which produce this recovery request, 
or -1 for manual requests.
+     * @param targetNodeName Name of the target node to forward the request to.
+     * @return Future that completes when the forwarded request is processed.
+     */
+    private CompletableFuture<Void> forwardDisasterRecoveryRequest(
+            DisasterRecoveryRequest request,
+            long revision,
+            String targetNodeName
+    ) {
+        byte[] serializedRequest = VersionedSerialization.toBytes(request, 
DisasterRecoveryRequestSerializer.INSTANCE);

Review Comment:
   We need to implement explicit serialiser to support compatibility. See 
implementations of `org.apache.ignite.internal.versioned.VersionedSerializer`. 
Corresponding test must be presented as well 



##########
modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerRestartPartitionsWithCleanupTest.java:
##########
@@ -219,9 +230,49 @@ public void 
testRestartTablePartitionsWithCleanupAllPartitions() throws Interrup
         MutableHttpRequest<?> post = 
HttpRequest.POST(RESTART_PARTITIONS_WITH_CLEANUP_ENDPOINT,
                 new RestartPartitionsRequest(nodeName, FIRST_ZONE, 
QUALIFIED_TABLE_NAME, Set.of()));
 
-        HttpResponse<Void> response = client.toBlocking().exchange(post);
+        HttpResponse<Void> response = client1.toBlocking().exchange(post);
+
+        assertThat(response.getStatus().getCode(), is(OK.code()));
+    }
+
+    @Test
+    public void 
testRestartTablePartitionsWithCleanupAllPartitionsOnDifferentNode() throws 
InterruptedException {
+        awaitPartitionsToBeHealthy(FIRST_ZONE, Set.of());
+        IgniteImpl calledNode = unwrapIgniteImpl(CLUSTER.nodes().get(1));
+        IgniteImpl targetNode = unwrapIgniteImpl(CLUSTER.nodes().get(0));
+        AtomicBoolean targetIsCalled = new AtomicBoolean(false);
+        AtomicBoolean calledRepliedSuccessfully = new AtomicBoolean(false);
+
+        // Record that the called node sent the request.
+        calledNode.dropMessages((nodeName, msg) -> {
+            if (msg instanceof DisasterRecoveryRequestMessage) {
+                targetIsCalled.set(true);
+            }
+            return false; // do not drop the message.
+        });
+
+        // Record that the target node received the request responded 
successfully.
+        targetNode.dropMessages((nodeName, msg) -> {
+            if (msg instanceof DisasterRecoveryResponseMessage) {
+                DisasterRecoveryResponseMessage responseMessage = 
(DisasterRecoveryResponseMessage) msg;
+                if (responseMessage.errorMessage() == null) {
+                    calledRepliedSuccessfully.set(true);
+                }
+            }
+            return false; // do not drop the message.
+        });
+
+        Set<String> nodeName = Set.of(CLUSTER.nodes().get(0).name());
+
+        MutableHttpRequest<?> post = 
HttpRequest.POST(RESTART_PARTITIONS_WITH_CLEANUP_ENDPOINT,

Review Comment:
   We need the same test for `RESTART_ZONE_PARTITIONS_WITH_CLEANUP_ENDPOINT`



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -1250,6 +1273,40 @@ private void putRecoveryTriggerIfRevisionIsNotProcessed(
                 });
     }
 
+    /**
+     * Forwards a disaster recovery request to a specific node.
+     *
+     * @param request The disaster recovery request to forward.
+     * @param revision Revision of event, which produce this recovery request, 
or -1 for manual requests.
+     * @param targetNodeName Name of the target node to forward the request to.
+     * @return Future that completes when the forwarded request is processed.
+     */
+    private CompletableFuture<Void> forwardDisasterRecoveryRequest(
+            DisasterRecoveryRequest request,
+            long revision,
+            String targetNodeName
+    ) {
+        byte[] serializedRequest = VersionedSerialization.toBytes(request, 
DisasterRecoveryRequestSerializer.INSTANCE);
+
+        DisasterRecoveryRequestMessage message = 
PARTITION_REPLICATION_MESSAGES_FACTORY.disasterRecoveryRequestMessage()
+                .requestBytes(serializedRequest)
+                .revision(revision)
+                .build();
+
+        return messagingService.invoke(targetNodeName, message, 
TimeUnit.SECONDS.toMillis(TIMEOUT_SECONDS))
+                .thenApply(responseMsg -> {
+                    assert responseMsg instanceof 
DisasterRecoveryResponseMessage : responseMsg;
+
+                    DisasterRecoveryResponseMessage response = 
(DisasterRecoveryResponseMessage) responseMsg;
+
+                    if (response.errorMessage() != null) {
+                        String errorMessage = response.errorMessage();
+                        throw new 
DisasterRecoveryException(PARTITION_STATE_ERR,  errorMessage == null ? "Unknown 
error" : errorMessage);

Review Comment:
   Form my point of view, this exception is too generic. I would think about 
creating specific exception, or using `NodesNotFoundException` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to