rpuch commented on code in PR #3614:
URL: https://github.com/apache/ignite-3/pull/3614#discussion_r1568857289


##########
modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.api.recovery;
+
+import io.micronaut.http.annotation.Controller;
+import io.micronaut.http.annotation.Get;
+import io.micronaut.http.annotation.PathVariable;
+import io.micronaut.http.annotation.Produces;
+import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.media.Content;
+import io.swagger.v3.oas.annotations.media.Schema;
+import io.swagger.v3.oas.annotations.responses.ApiResponse;
+import io.swagger.v3.oas.annotations.tags.Tag;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.rest.api.Problem;
+import org.apache.ignite.internal.rest.constants.MediaType;
+
+/**
+ * Disaster recovery controller.
+ */
+@Controller("/management/v1/recovery")
+@Tag(name = "recovery")
+public interface DisasterRecoveryApi {
+    @Get("state/local")
+    @Operation(operationId = "getLocalPartitionStates", description = "Returns 
local partition states.")
+    @ApiResponse(responseCode = "200", description = "Partition states 
returned.")
+    @ApiResponse(responseCode = "500", description = "Internal error.",
+            content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = 
@Schema(implementation = Problem.class)))
+    @Produces(MediaType.APPLICATION_JSON)
+    CompletableFuture<LocalPartitionStatesResponse> getLocalPartitionStates();
+
+    @Get("state/local/{zoneName}")
+    @Operation(operationId = "getLocalPartitionStatesByZone", description = 
"Returns local partition states.")
+    @ApiResponse(responseCode = "200", description = "Partition states 
returned.")
+    @ApiResponse(responseCode = "500", description = "Internal error.",
+            content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = 
@Schema(implementation = Problem.class)))
+    @ApiResponse(responseCode = "400", description = "Zone is not found.",

Review Comment:
   Why isn't it 404?



##########
modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.api.recovery;
+
+import io.micronaut.http.annotation.Controller;
+import io.micronaut.http.annotation.Get;
+import io.micronaut.http.annotation.PathVariable;
+import io.micronaut.http.annotation.Produces;
+import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.media.Content;
+import io.swagger.v3.oas.annotations.media.Schema;
+import io.swagger.v3.oas.annotations.responses.ApiResponse;
+import io.swagger.v3.oas.annotations.tags.Tag;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.rest.api.Problem;
+import org.apache.ignite.internal.rest.constants.MediaType;
+
+/**
+ * Disaster recovery controller.
+ */
+@Controller("/management/v1/recovery")
+@Tag(name = "recovery")
+public interface DisasterRecoveryApi {
+    @Get("state/local")
+    @Operation(operationId = "getLocalPartitionStates", description = "Returns 
local partition states.")
+    @ApiResponse(responseCode = "200", description = "Partition states 
returned.")
+    @ApiResponse(responseCode = "500", description = "Internal error.",

Review Comment:
   Is it required to explicitly list code 500 for internal errors? `ComputeApi` 
does not do that, and it seems logical for Micronaut to just map anything not 
listed explicitly to 500.



##########
modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.recovery;
+
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.IntStream.range;
+import static 
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
+import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import io.micronaut.http.HttpResponse;
+import io.micronaut.http.HttpStatus;
+import io.micronaut.http.client.HttpClient;
+import io.micronaut.http.client.annotation.Client;
+import io.micronaut.http.client.exceptions.HttpClientResponseException;
+import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
+import jakarta.inject.Inject;
+import java.util.List;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import 
org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStateResponse;
+import 
org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStatesResponse;
+import 
org.apache.ignite.internal.rest.api.recovery.LocalPartitionStateResponse;
+import 
org.apache.ignite.internal.rest.api.recovery.LocalPartitionStatesResponse;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test for disaster recovery REST commands.
+ */
+@MicronautTest
+public class ItDisasterRecoveryControllerTest extends 
ClusterPerTestIntegrationTest {
+    private static final String NODE_URL = "http://localhost:"; + 
Cluster.BASE_HTTP_PORT;
+
+    @Inject
+    @Client(NODE_URL + "/management/v1/recovery/")
+    HttpClient client;
+
+    @Override
+    protected int initialNodes() {
+        return 1;
+    }
+
+    @Test
+    void testLocalPartitionStates() {
+        executeSql("CREATE TABLE foo (id INT PRIMARY KEY, val INT)");
+        var response = client.toBlocking().exchange("/state/local/", 
LocalPartitionStatesResponse.class);
+
+        assertEquals(HttpStatus.OK, response.status());
+
+        LocalPartitionStatesResponse body = response.body();
+        assertEquals(DEFAULT_PARTITION_COUNT, body.states().size());
+
+        List<Integer> partitionIds = 
body.states().stream().map(LocalPartitionStateResponse::partitionId).collect(toList());
+        assertEquals(range(0, 
DEFAULT_PARTITION_COUNT).boxed().collect(toList()), partitionIds);
+    }
+
+    @Test
+    void testLocalPartitionStatesByZoneMissingZone() {
+        HttpClientResponseException thrown = assertThrows(
+                HttpClientResponseException.class,
+                () -> client.toBlocking().exchange("/state/local/foo/", 
LocalPartitionStatesResponse.class)
+        );
+
+        assertEquals(HttpStatus.BAD_REQUEST, thrown.getResponse().status());
+    }
+
+    @Test
+    void testLocalPartitionStatesByZone() {
+        executeSql("CREATE TABLE def (id INT PRIMARY KEY, val INT)");
+
+        executeSql("CREATE ZONE foo WITH partitions=1, storage_profiles='" + 
DEFAULT_AIPERSIST_PROFILE_NAME + "'");
+        executeSql("CREATE TABLE foo (id INT PRIMARY KEY, val INT) WITH 
PRIMARY_ZONE = 'FOO'");
+
+        var response = client.toBlocking().exchange("/state/local/Default/", 
LocalPartitionStatesResponse.class);
+
+        assertEquals(HttpStatus.OK, response.status());
+        assertEquals(DEFAULT_PARTITION_COUNT, response.body().states().size());
+
+        response = client.toBlocking().exchange("/state/local/FOO/", 
LocalPartitionStatesResponse.class);
+
+        assertEquals(HttpStatus.OK, response.status());
+        assertEquals(1, response.body().states().size());
+    }
+
+    @Test
+    void testLocalPartitionStatesByZoneJson() {
+        executeSql("CREATE ZONE foo WITH partitions=1, storage_profiles='" + 
DEFAULT_AIPERSIST_PROFILE_NAME + "'");
+        executeSql("CREATE TABLE foo (id INT PRIMARY KEY, val INT) WITH 
PRIMARY_ZONE = 'FOO'");
+
+        HttpResponse<String> response = 
client.toBlocking().exchange("/state/local/FOO/", String.class);
+
+        assertEquals(
+                
"{'states':[{'partitionId':0,'tableName':'FOO','nodeName':'idrct_tlpsbzj_0','state':'HEALTHY'}]}".replace('\'',
 '"'),

Review Comment:
   This seems brittle. I think Java does not guarantee any field/method order 
while traversing a class. Maybe Micronaut does (in accordance with the order in 
`openapi.yaml`), but anyway the order is not significant here. Probably the 
Micronaut testing framework should have ways to assert JSONs equality in terms 
of content, not in terms of their string representation.



##########
modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.recovery;
+
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.IntStream.range;
+import static 
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
+import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import io.micronaut.http.HttpResponse;
+import io.micronaut.http.HttpStatus;
+import io.micronaut.http.client.HttpClient;
+import io.micronaut.http.client.annotation.Client;
+import io.micronaut.http.client.exceptions.HttpClientResponseException;
+import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
+import jakarta.inject.Inject;
+import java.util.List;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import 
org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStateResponse;
+import 
org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStatesResponse;
+import 
org.apache.ignite.internal.rest.api.recovery.LocalPartitionStateResponse;
+import 
org.apache.ignite.internal.rest.api.recovery.LocalPartitionStatesResponse;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test for disaster recovery REST commands.
+ */
+@MicronautTest
+public class ItDisasterRecoveryControllerTest extends 
ClusterPerTestIntegrationTest {
+    private static final String NODE_URL = "http://localhost:"; + 
Cluster.BASE_HTTP_PORT;
+
+    @Inject
+    @Client(NODE_URL + "/management/v1/recovery/")
+    HttpClient client;
+
+    @Override
+    protected int initialNodes() {
+        return 1;
+    }
+
+    @Test
+    void testLocalPartitionStates() {
+        executeSql("CREATE TABLE foo (id INT PRIMARY KEY, val INT)");
+        var response = client.toBlocking().exchange("/state/local/", 
LocalPartitionStatesResponse.class);
+
+        assertEquals(HttpStatus.OK, response.status());
+
+        LocalPartitionStatesResponse body = response.body();
+        assertEquals(DEFAULT_PARTITION_COUNT, body.states().size());
+
+        List<Integer> partitionIds = 
body.states().stream().map(LocalPartitionStateResponse::partitionId).collect(toList());
+        assertEquals(range(0, 
DEFAULT_PARTITION_COUNT).boxed().collect(toList()), partitionIds);
+    }
+
+    @Test
+    void testLocalPartitionStatesByZoneMissingZone() {
+        HttpClientResponseException thrown = assertThrows(
+                HttpClientResponseException.class,
+                () -> client.toBlocking().exchange("/state/local/foo/", 
LocalPartitionStatesResponse.class)
+        );
+
+        assertEquals(HttpStatus.BAD_REQUEST, thrown.getResponse().status());
+    }
+
+    @Test
+    void testLocalPartitionStatesByZone() {
+        executeSql("CREATE TABLE def (id INT PRIMARY KEY, val INT)");
+
+        executeSql("CREATE ZONE foo WITH partitions=1, storage_profiles='" + 
DEFAULT_AIPERSIST_PROFILE_NAME + "'");
+        executeSql("CREATE TABLE foo (id INT PRIMARY KEY, val INT) WITH 
PRIMARY_ZONE = 'FOO'");
+
+        var response = client.toBlocking().exchange("/state/local/Default/", 
LocalPartitionStatesResponse.class);
+
+        assertEquals(HttpStatus.OK, response.status());
+        assertEquals(DEFAULT_PARTITION_COUNT, response.body().states().size());
+
+        response = client.toBlocking().exchange("/state/local/FOO/", 
LocalPartitionStatesResponse.class);
+
+        assertEquals(HttpStatus.OK, response.status());
+        assertEquals(1, response.body().states().size());
+    }
+
+    @Test
+    void testLocalPartitionStatesByZoneJson() {
+        executeSql("CREATE ZONE foo WITH partitions=1, storage_profiles='" + 
DEFAULT_AIPERSIST_PROFILE_NAME + "'");
+        executeSql("CREATE TABLE foo (id INT PRIMARY KEY, val INT) WITH 
PRIMARY_ZONE = 'FOO'");
+
+        HttpResponse<String> response = 
client.toBlocking().exchange("/state/local/FOO/", String.class);
+
+        assertEquals(
+                
"{'states':[{'partitionId':0,'tableName':'FOO','nodeName':'idrct_tlpsbzj_0','state':'HEALTHY'}]}".replace('\'',
 '"'),
+                response.body()
+        );
+    }
+
+    @Test
+    void testGlobalPartitionStates() {
+        executeSql("CREATE TABLE foo (id INT PRIMARY KEY, val INT)");
+        var response = client.toBlocking().exchange("/state/global/", 
GlobalPartitionStatesResponse.class);
+
+        assertEquals(HttpStatus.OK, response.status());
+
+        GlobalPartitionStatesResponse body = response.body();
+        assertEquals(DEFAULT_PARTITION_COUNT, body.states().size());
+
+        List<Integer> partitionIds = 
body.states().stream().map(GlobalPartitionStateResponse::partitionId).collect(toList());
+        assertEquals(range(0, 
DEFAULT_PARTITION_COUNT).boxed().collect(toList()), partitionIds);
+    }
+
+    @Test
+    void testGlobalPartitionStatesByZoneMissingZone() {
+        HttpClientResponseException thrown = assertThrows(
+                HttpClientResponseException.class,
+                () -> client.toBlocking().exchange("/state/global/foo/", 
GlobalPartitionStatesResponse.class)

Review Comment:
   ```suggestion
                   () -> 
client.toBlocking().exchange("/state/global/no-such-zone/", 
GlobalPartitionStatesResponse.class)
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/GlobalPartitionStateEnum.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.disaster;
+
+/**
+ * Enum for states of partitions.
+ */
+public enum GlobalPartitionStateEnum {
+    /** All replicas are healthy. */
+    AVAILABLE,
+
+    /** There are healthy replicas, but they don't form a majority. */
+    READ_ONLY,
+
+    /** There are healthy replicas, and they form a majority. */

Review Comment:
   Would it make sense to define them in the order of 'how damaged the 
partition is', so AVAILABLE->DEGRADED->READ-ONLY->UNAVAILABLE?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -175,26 +191,55 @@ public CompletableFuture<Void> resetPartitions(int 
zoneId, int tableId) {
      * @param zoneName Zone name.

Review Comment:
   Please clarify what it means to pass `null` here



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -357,36 +402,94 @@ private static LocalPartitionStateEnum convertState(State 
nodeState) {
 
             default:
                 // Unrecognized state, better safe than sorry.
-                return LocalPartitionStateEnum.BROKEN;
+                return BROKEN;
         }
     }
 
     /**
      * Replaces some healthy states with a {@link 
LocalPartitionStateEnum#CATCHING_UP},it can only be done once the state of all 
peers is
      * known.
      */
-    private Map<TablePartitionId, Map<String, LocalPartitionState>> normalize(
-            Map<TablePartitionId, Map<String, LocalPartitionState>> result
+    private static Map<TablePartitionId, Map<String, LocalPartitionState>> 
normalizeLocal(
+            Map<TablePartitionId, Map<String, LocalPartitionStateMessage>> 
result,
+            Catalog catalog
     ) {
         return 
result.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> 
{
-            Map<String, LocalPartitionState> map = entry.getValue();
+            TablePartitionId tablePartitionId = entry.getKey();
+            Map<String, LocalPartitionStateMessage> map = entry.getValue();
 
             // noinspection OptionalGetWithoutIsPresent
-            long maxLogIndex = 
map.values().stream().mapToLong(LocalPartitionState::logIndex).max().getAsLong();
+            long maxLogIndex = 
map.values().stream().mapToLong(LocalPartitionStateMessage::logIndex).max().getAsLong();
 
             return 
map.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry2 -> {
-                LocalPartitionState state = entry2.getValue();
+                LocalPartitionStateMessage stateMsg = entry2.getValue();
 
-                if (state.state() != LocalPartitionStateEnum.HEALTHY || 
maxLogIndex - state.logIndex() < CATCH_UP_THRESHOLD) {
-                    return state;
+                LocalPartitionStateEnum stateEnum = stateMsg.state();
+
+                if (stateMsg.state() == HEALTHY && maxLogIndex - 
stateMsg.logIndex() >= CATCH_UP_THRESHOLD) {
+                    stateEnum = CATCHING_UP;
                 }
 
-                return MSG_FACTORY.localPartitionState()
-                        .state(LocalPartitionStateEnum.CATCHING_UP)
-                        .partitionId(state.partitionId())
-                        .logIndex(state.logIndex())
-                        .build();
+                CatalogTableDescriptor tableDescriptor = 
catalog.table(tablePartitionId.tableId());
+                return new LocalPartitionState(tableDescriptor.name(), 
tablePartitionId.partitionId(), stateEnum);
             }));
         }));
     }
+
+    private static Map<TablePartitionId, GlobalPartitionState> normalizeGlobal(
+            Map<TablePartitionId, Map<String, LocalPartitionState>> 
localResult,
+            Catalog catalog
+    ) {
+        Map<Integer, Integer> tableIdToPartitions = new HashMap<>();
+
+        Map<TablePartitionId, GlobalPartitionState> result = 
localResult.entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, entry -> {
+                    TablePartitionId tablePartitionId = entry.getKey();
+                    Map<String, LocalPartitionState> map = entry.getValue();
+
+                    int zoneId = 
catalog.table(tablePartitionId.tableId()).zoneId();

Review Comment:
   Is it somehow guaranteed that this node contains in its local Catalog all 
tables that might come from other nodes in responses?



##########
modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/GlobalPartitionStatesResponse.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.api.recovery;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.v3.oas.annotations.media.Schema;
+import java.util.List;
+
+/**
+ * Global partition states schema class.
+ */
+@Schema(description = "Information about global partition states.")
+public class GlobalPartitionStatesResponse {
+    @Schema
+    private final List<GlobalPartitionStateResponse> states;
+
+    @JsonCreator
+    public GlobalPartitionStatesResponse(@JsonProperty("states") 
List<GlobalPartitionStateResponse> states) {
+        this.states = states;

Review Comment:
   How about making a defensive copy?



##########
modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.api.recovery;
+
+import io.micronaut.http.annotation.Controller;
+import io.micronaut.http.annotation.Get;
+import io.micronaut.http.annotation.PathVariable;
+import io.micronaut.http.annotation.Produces;
+import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.media.Content;
+import io.swagger.v3.oas.annotations.media.Schema;
+import io.swagger.v3.oas.annotations.responses.ApiResponse;
+import io.swagger.v3.oas.annotations.tags.Tag;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.rest.api.Problem;
+import org.apache.ignite.internal.rest.constants.MediaType;
+
+/**
+ * Disaster recovery controller.
+ */
+@Controller("/management/v1/recovery")
+@Tag(name = "recovery")

Review Comment:
   Should it be 'disaster-recovery' or both 'disaster' and 'recovery'? As 
'recovery' might mean many things in the context of a DBMS.



##########
modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/GlobalPartitionStateResponse.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.api.recovery;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.v3.oas.annotations.media.Schema;
+
+/**
+ * Global partition state schema class.
+ */
+@Schema(description = "Information about global partition state.")
+public class GlobalPartitionStateResponse {
+    private final int partitionId;
+    private final String tableName;
+    private final String state;

Review Comment:
   Is it an enum? If yes, is it possible to use an enum type here?



##########
modules/rest-api/src/main/java/org/apache/ignite/internal/rest/api/recovery/DisasterRecoveryApi.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.api.recovery;
+
+import io.micronaut.http.annotation.Controller;
+import io.micronaut.http.annotation.Get;
+import io.micronaut.http.annotation.PathVariable;
+import io.micronaut.http.annotation.Produces;
+import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.media.Content;
+import io.swagger.v3.oas.annotations.media.Schema;
+import io.swagger.v3.oas.annotations.responses.ApiResponse;
+import io.swagger.v3.oas.annotations.tags.Tag;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.rest.api.Problem;
+import org.apache.ignite.internal.rest.constants.MediaType;
+
+/**
+ * Disaster recovery controller.
+ */
+@Controller("/management/v1/recovery")
+@Tag(name = "recovery")
+public interface DisasterRecoveryApi {
+    @Get("state/local")
+    @Operation(operationId = "getLocalPartitionStates", description = "Returns 
local partition states.")
+    @ApiResponse(responseCode = "200", description = "Partition states 
returned.")
+    @ApiResponse(responseCode = "500", description = "Internal error.",
+            content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = 
@Schema(implementation = Problem.class)))
+    @Produces(MediaType.APPLICATION_JSON)
+    CompletableFuture<LocalPartitionStatesResponse> getLocalPartitionStates();
+
+    @Get("state/local/{zoneName}")
+    @Operation(operationId = "getLocalPartitionStatesByZone", description = 
"Returns local partition states.")
+    @ApiResponse(responseCode = "200", description = "Partition states 
returned.")
+    @ApiResponse(responseCode = "500", description = "Internal error.",
+            content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = 
@Schema(implementation = Problem.class)))
+    @ApiResponse(responseCode = "400", description = "Zone is not found.",
+            content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = 
@Schema(implementation = Problem.class)))
+    @Produces(MediaType.APPLICATION_JSON)
+    CompletableFuture<LocalPartitionStatesResponse> 
getLocalPartitionStates(@PathVariable("zoneName") String zoneName);
+
+    @Get("state/global")
+    @Operation(operationId = "getGlobalPartitionStates", description = 
"Returns global partition states.")
+    @ApiResponse(responseCode = "200", description = "Partition states 
returned.")
+    @ApiResponse(responseCode = "500", description = "Internal error.",
+            content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = 
@Schema(implementation = Problem.class)))
+    @Produces(MediaType.APPLICATION_JSON)
+    CompletableFuture<GlobalPartitionStatesResponse> 
getGlobalPartitionStates();
+
+    @Get("state/global/{zoneName}")
+    @Operation(operationId = "getGlobalPartitionStatesByZone", description = 
"Returns global partition states.")
+    @ApiResponse(responseCode = "200", description = "Partition states 
returned.")
+    @ApiResponse(responseCode = "500", description = "Internal error.",
+            content = @Content(mediaType = MediaType.PROBLEM_JSON, schema = 
@Schema(implementation = Problem.class)))
+    @ApiResponse(responseCode = "400", description = "Zone is not found.",

Review Comment:
   404?



##########
modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.recovery;
+
+import static java.util.Comparator.comparing;
+import static 
org.apache.ignite.internal.rest.recovery.DisasterRecoveryFactory.DISASTER_RECOVERY_MANAGER_NAME;
+
+import io.micronaut.context.annotation.Requires;
+import io.micronaut.http.annotation.Controller;
+import jakarta.inject.Named;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.rest.api.recovery.DisasterRecoveryApi;
+import 
org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStateResponse;
+import 
org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStatesResponse;
+import 
org.apache.ignite.internal.rest.api.recovery.LocalPartitionStateResponse;
+import 
org.apache.ignite.internal.rest.api.recovery.LocalPartitionStatesResponse;
+import 
org.apache.ignite.internal.rest.exception.handler.IgniteInternalExceptionHandler;
+import 
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager;
+import 
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionState;
+import 
org.apache.ignite.internal.table.distributed.disaster.LocalPartitionState;
+
+/**
+ * Disaster recovery controller.
+ */
+@Controller("/management/v1/recovery/")
+@Requires(classes = IgniteInternalExceptionHandler.class)
+public class DisasterRecoveryController implements DisasterRecoveryApi {
+    private final DisasterRecoveryManager disasterRecoveryManager;
+
+    public DisasterRecoveryController(@Named(DISASTER_RECOVERY_MANAGER_NAME) 
DisasterRecoveryManager disasterRecoveryManager) {

Review Comment:
   Is the name needed? Can we have more than 1 `DisasterRecoveryManager` 
instance?



##########
modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.recovery;
+
+import static java.util.Comparator.comparing;
+import static 
org.apache.ignite.internal.rest.recovery.DisasterRecoveryFactory.DISASTER_RECOVERY_MANAGER_NAME;
+
+import io.micronaut.context.annotation.Requires;
+import io.micronaut.http.annotation.Controller;
+import jakarta.inject.Named;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.rest.api.recovery.DisasterRecoveryApi;
+import 
org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStateResponse;
+import 
org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStatesResponse;
+import 
org.apache.ignite.internal.rest.api.recovery.LocalPartitionStateResponse;
+import 
org.apache.ignite.internal.rest.api.recovery.LocalPartitionStatesResponse;
+import 
org.apache.ignite.internal.rest.exception.handler.IgniteInternalExceptionHandler;
+import 
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager;
+import 
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionState;
+import 
org.apache.ignite.internal.table.distributed.disaster.LocalPartitionState;
+
+/**
+ * Disaster recovery controller.
+ */
+@Controller("/management/v1/recovery/")
+@Requires(classes = IgniteInternalExceptionHandler.class)
+public class DisasterRecoveryController implements DisasterRecoveryApi {
+    private final DisasterRecoveryManager disasterRecoveryManager;
+
+    public DisasterRecoveryController(@Named(DISASTER_RECOVERY_MANAGER_NAME) 
DisasterRecoveryManager disasterRecoveryManager) {
+        this.disasterRecoveryManager = disasterRecoveryManager;
+    }
+
+    @Override
+    public CompletableFuture<LocalPartitionStatesResponse> 
getLocalPartitionStates() {
+        return 
disasterRecoveryManager.localPartitionStates(null).thenApply(DisasterRecoveryController::convertLocalStates);
+    }
+
+    @Override
+    public CompletableFuture<LocalPartitionStatesResponse> 
getLocalPartitionStates(String zoneName) {
+        return 
disasterRecoveryManager.localPartitionStates(zoneName).thenApply(DisasterRecoveryController::convertLocalStates);
+    }
+
+    @Override
+    public CompletableFuture<GlobalPartitionStatesResponse> 
getGlobalPartitionStates() {
+        return 
disasterRecoveryManager.globalPartitionStates(null).thenApply(DisasterRecoveryController::convertGlobalStates);
+    }
+
+    @Override
+    public CompletableFuture<GlobalPartitionStatesResponse> 
getGlobalPartitionStates(String zoneName) {
+        return 
disasterRecoveryManager.globalPartitionStates(zoneName).thenApply(DisasterRecoveryController::convertGlobalStates);
+    }
+
+    private static LocalPartitionStatesResponse 
convertLocalStates(Map<TablePartitionId, Map<String, LocalPartitionState>> 
localStates) {
+        List<LocalPartitionStateResponse> states = new ArrayList<>();
+
+        for (Entry<TablePartitionId, Map<String, LocalPartitionState>> entry0 
: localStates.entrySet()) {

Review Comment:
   It seems we don't need keys from the outer map, so a simpler iteration over 
values of this map is possible



##########
modules/rest/src/main/java/org/apache/ignite/internal/rest/recovery/DisasterRecoveryController.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.recovery;
+
+import static java.util.Comparator.comparing;
+import static 
org.apache.ignite.internal.rest.recovery.DisasterRecoveryFactory.DISASTER_RECOVERY_MANAGER_NAME;
+
+import io.micronaut.context.annotation.Requires;
+import io.micronaut.http.annotation.Controller;
+import jakarta.inject.Named;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.rest.api.recovery.DisasterRecoveryApi;
+import 
org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStateResponse;
+import 
org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStatesResponse;
+import 
org.apache.ignite.internal.rest.api.recovery.LocalPartitionStateResponse;
+import 
org.apache.ignite.internal.rest.api.recovery.LocalPartitionStatesResponse;
+import 
org.apache.ignite.internal.rest.exception.handler.IgniteInternalExceptionHandler;
+import 
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager;
+import 
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionState;
+import 
org.apache.ignite.internal.table.distributed.disaster.LocalPartitionState;
+
+/**
+ * Disaster recovery controller.
+ */
+@Controller("/management/v1/recovery/")
+@Requires(classes = IgniteInternalExceptionHandler.class)
+public class DisasterRecoveryController implements DisasterRecoveryApi {
+    private final DisasterRecoveryManager disasterRecoveryManager;
+
+    public DisasterRecoveryController(@Named(DISASTER_RECOVERY_MANAGER_NAME) 
DisasterRecoveryManager disasterRecoveryManager) {
+        this.disasterRecoveryManager = disasterRecoveryManager;
+    }
+
+    @Override
+    public CompletableFuture<LocalPartitionStatesResponse> 
getLocalPartitionStates() {
+        return 
disasterRecoveryManager.localPartitionStates(null).thenApply(DisasterRecoveryController::convertLocalStates);
+    }
+
+    @Override
+    public CompletableFuture<LocalPartitionStatesResponse> 
getLocalPartitionStates(String zoneName) {
+        return 
disasterRecoveryManager.localPartitionStates(zoneName).thenApply(DisasterRecoveryController::convertLocalStates);
+    }
+
+    @Override
+    public CompletableFuture<GlobalPartitionStatesResponse> 
getGlobalPartitionStates() {
+        return 
disasterRecoveryManager.globalPartitionStates(null).thenApply(DisasterRecoveryController::convertGlobalStates);
+    }
+
+    @Override
+    public CompletableFuture<GlobalPartitionStatesResponse> 
getGlobalPartitionStates(String zoneName) {
+        return 
disasterRecoveryManager.globalPartitionStates(zoneName).thenApply(DisasterRecoveryController::convertGlobalStates);
+    }
+
+    private static LocalPartitionStatesResponse 
convertLocalStates(Map<TablePartitionId, Map<String, LocalPartitionState>> 
localStates) {
+        List<LocalPartitionStateResponse> states = new ArrayList<>();
+
+        for (Entry<TablePartitionId, Map<String, LocalPartitionState>> entry0 
: localStates.entrySet()) {
+            for (Entry<String, LocalPartitionState> entry1 : 
entry0.getValue().entrySet()) {
+                String nodeName = entry1.getKey();
+                LocalPartitionState state = entry1.getValue();
+
+                states.add(new LocalPartitionStateResponse(
+                        state.partitionId,
+                        state.tableName,
+                        nodeName,
+                        state.state.name()
+                ));
+            }
+        }
+
+        // Sort the output conveniently.
+        states.sort(comparing(LocalPartitionStateResponse::tableName)
+                .thenComparingInt(LocalPartitionStateResponse::partitionId)
+                .thenComparing(LocalPartitionStateResponse::nodeName));
+
+        return new LocalPartitionStatesResponse(states);
+    }
+
+    private static GlobalPartitionStatesResponse 
convertGlobalStates(Map<TablePartitionId, GlobalPartitionState> globalStates) {
+        List<GlobalPartitionStateResponse> states = new ArrayList<>();
+
+        for (Entry<TablePartitionId, GlobalPartitionState> entry : 
globalStates.entrySet()) {

Review Comment:
   Here, we don't need map keys as well, so we could just iterate over 
`values()`



##########
modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerTest.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rest.recovery;
+
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.IntStream.range;
+import static 
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
+import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import io.micronaut.http.HttpResponse;
+import io.micronaut.http.HttpStatus;
+import io.micronaut.http.client.HttpClient;
+import io.micronaut.http.client.annotation.Client;
+import io.micronaut.http.client.exceptions.HttpClientResponseException;
+import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
+import jakarta.inject.Inject;
+import java.util.List;
+import org.apache.ignite.internal.Cluster;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import 
org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStateResponse;
+import 
org.apache.ignite.internal.rest.api.recovery.GlobalPartitionStatesResponse;
+import 
org.apache.ignite.internal.rest.api.recovery.LocalPartitionStateResponse;
+import 
org.apache.ignite.internal.rest.api.recovery.LocalPartitionStatesResponse;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test for disaster recovery REST commands.
+ */
+@MicronautTest
+public class ItDisasterRecoveryControllerTest extends 
ClusterPerTestIntegrationTest {
+    private static final String NODE_URL = "http://localhost:"; + 
Cluster.BASE_HTTP_PORT;
+
+    @Inject
+    @Client(NODE_URL + "/management/v1/recovery/")
+    HttpClient client;
+
+    @Override
+    protected int initialNodes() {
+        return 1;
+    }
+
+    @Test
+    void testLocalPartitionStates() {
+        executeSql("CREATE TABLE foo (id INT PRIMARY KEY, val INT)");
+        var response = client.toBlocking().exchange("/state/local/", 
LocalPartitionStatesResponse.class);
+
+        assertEquals(HttpStatus.OK, response.status());
+
+        LocalPartitionStatesResponse body = response.body();
+        assertEquals(DEFAULT_PARTITION_COUNT, body.states().size());
+
+        List<Integer> partitionIds = 
body.states().stream().map(LocalPartitionStateResponse::partitionId).collect(toList());
+        assertEquals(range(0, 
DEFAULT_PARTITION_COUNT).boxed().collect(toList()), partitionIds);
+    }
+
+    @Test
+    void testLocalPartitionStatesByZoneMissingZone() {
+        HttpClientResponseException thrown = assertThrows(
+                HttpClientResponseException.class,
+                () -> client.toBlocking().exchange("/state/local/foo/", 
LocalPartitionStatesResponse.class)

Review Comment:
   ```suggestion
                   () -> 
client.toBlocking().exchange("/state/local/no-such-zone/", 
LocalPartitionStatesResponse.class)
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -78,7 +90,7 @@ public class DisasterRecoveryManager implements 
IgniteComponent {
 
     private static final int TIMEOUT = 30;
 
-    private static final int CATCH_UP_THRESHOLD = 10;
+    private static final int CATCH_UP_THRESHOLD = 100;

Review Comment:
   Could you please add javadocs to `TIMEOUT` and `CATCH_UP_THRESHOLD` 
explaining what is means and in which units they are measured? Also, maybe it 
makes sense to include the unit in the name for the timeout constant, like 
`TIMEOUT_SEC`.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -175,26 +191,55 @@ public CompletableFuture<Void> resetPartitions(int 
zoneId, int tableId) {
      * @param zoneName Zone name.
      * @return Future with the mapping.
      */
-    public CompletableFuture<Map<TablePartitionId, Map<String, 
LocalPartitionState>>> partitionStates(String zoneName) {
-        int latestCatalogVersion = catalogManager.latestCatalogVersion();
-        Optional<CatalogZoneDescriptor> zoneDesciptorOptional = 
catalogManager.zones(latestCatalogVersion).stream()
-                .filter(catalogZoneDescriptor -> 
catalogZoneDescriptor.name().equals(zoneName))
-                .findAny();
-
-        if (zoneDesciptorOptional.isEmpty()) {
-            return CompletableFuture.failedFuture(new 
DistributionZoneNotFoundException(zoneName, null));
-        }
+    public CompletableFuture<Map<TablePartitionId, Map<String, 
LocalPartitionState>>> localPartitionStates(@Nullable String zoneName) {
+        Catalog catalog = 
catalogManager.catalog(catalogManager.latestCatalogVersion());
+
+        return localPartitionStatesInternal(zoneName, catalog)
+                .thenApply(res -> normalizeLocal(res, catalog));
+    }
 
-        CatalogZoneDescriptor zoneDescriptor = zoneDesciptorOptional.get();
+    /**
+     * Returns partition states for all zones' partitions in the cluster. 
Result is a mapping of {@link TablePartitionId} to the global
+     * partition state enum value.
+     *
+     * @param zoneName Zone name.
+     * @return Future with the mapping.
+     */
+    public CompletableFuture<Map<TablePartitionId, GlobalPartitionState>> 
globalPartitionStates(@Nullable String zoneName) {
+        Catalog catalog = 
catalogManager.catalog(catalogManager.latestCatalogVersion());
+
+        return localPartitionStatesInternal(zoneName, catalog)
+                .thenApply(res -> normalizeLocal(res, catalog))
+                .thenApply(res -> normalizeGlobal(res, catalog));
+    }
+
+    private CompletableFuture<Map<TablePartitionId, Map<String, 
LocalPartitionStateMessage>>> localPartitionStatesInternal(
+            @Nullable String zoneName, Catalog catalog
+    ) {
+        int zoneId;
+        if (zoneName == null) {
+            zoneId = -1;

Review Comment:
   Please extract this to a constant



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -357,36 +402,94 @@ private static LocalPartitionStateEnum convertState(State 
nodeState) {
 
             default:
                 // Unrecognized state, better safe than sorry.
-                return LocalPartitionStateEnum.BROKEN;
+                return BROKEN;
         }
     }
 
     /**
      * Replaces some healthy states with a {@link 
LocalPartitionStateEnum#CATCHING_UP},it can only be done once the state of all 
peers is
      * known.
      */
-    private Map<TablePartitionId, Map<String, LocalPartitionState>> normalize(
-            Map<TablePartitionId, Map<String, LocalPartitionState>> result
+    private static Map<TablePartitionId, Map<String, LocalPartitionState>> 
normalizeLocal(
+            Map<TablePartitionId, Map<String, LocalPartitionStateMessage>> 
result,
+            Catalog catalog
     ) {
         return 
result.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> 
{
-            Map<String, LocalPartitionState> map = entry.getValue();
+            TablePartitionId tablePartitionId = entry.getKey();
+            Map<String, LocalPartitionStateMessage> map = entry.getValue();
 
             // noinspection OptionalGetWithoutIsPresent
-            long maxLogIndex = 
map.values().stream().mapToLong(LocalPartitionState::logIndex).max().getAsLong();
+            long maxLogIndex = 
map.values().stream().mapToLong(LocalPartitionStateMessage::logIndex).max().getAsLong();
 
             return 
map.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry2 -> {

Review Comment:
   `toMap()` can be imported statically



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -175,26 +191,55 @@ public CompletableFuture<Void> resetPartitions(int 
zoneId, int tableId) {
      * @param zoneName Zone name.
      * @return Future with the mapping.
      */
-    public CompletableFuture<Map<TablePartitionId, Map<String, 
LocalPartitionState>>> partitionStates(String zoneName) {
-        int latestCatalogVersion = catalogManager.latestCatalogVersion();
-        Optional<CatalogZoneDescriptor> zoneDesciptorOptional = 
catalogManager.zones(latestCatalogVersion).stream()
-                .filter(catalogZoneDescriptor -> 
catalogZoneDescriptor.name().equals(zoneName))
-                .findAny();
-
-        if (zoneDesciptorOptional.isEmpty()) {
-            return CompletableFuture.failedFuture(new 
DistributionZoneNotFoundException(zoneName, null));
-        }
+    public CompletableFuture<Map<TablePartitionId, Map<String, 
LocalPartitionState>>> localPartitionStates(@Nullable String zoneName) {
+        Catalog catalog = 
catalogManager.catalog(catalogManager.latestCatalogVersion());
+
+        return localPartitionStatesInternal(zoneName, catalog)
+                .thenApply(res -> normalizeLocal(res, catalog));
+    }
 
-        CatalogZoneDescriptor zoneDescriptor = zoneDesciptorOptional.get();
+    /**
+     * Returns partition states for all zones' partitions in the cluster. 
Result is a mapping of {@link TablePartitionId} to the global
+     * partition state enum value.
+     *
+     * @param zoneName Zone name.

Review Comment:
   Please explain what `null` means here



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -357,36 +402,94 @@ private static LocalPartitionStateEnum convertState(State 
nodeState) {
 
             default:
                 // Unrecognized state, better safe than sorry.
-                return LocalPartitionStateEnum.BROKEN;
+                return BROKEN;
         }
     }
 
     /**
      * Replaces some healthy states with a {@link 
LocalPartitionStateEnum#CATCHING_UP},it can only be done once the state of all 
peers is
      * known.
      */
-    private Map<TablePartitionId, Map<String, LocalPartitionState>> normalize(
-            Map<TablePartitionId, Map<String, LocalPartitionState>> result
+    private static Map<TablePartitionId, Map<String, LocalPartitionState>> 
normalizeLocal(
+            Map<TablePartitionId, Map<String, LocalPartitionStateMessage>> 
result,
+            Catalog catalog
     ) {
         return 
result.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> 
{
-            Map<String, LocalPartitionState> map = entry.getValue();
+            TablePartitionId tablePartitionId = entry.getKey();
+            Map<String, LocalPartitionStateMessage> map = entry.getValue();
 
             // noinspection OptionalGetWithoutIsPresent
-            long maxLogIndex = 
map.values().stream().mapToLong(LocalPartitionState::logIndex).max().getAsLong();
+            long maxLogIndex = 
map.values().stream().mapToLong(LocalPartitionStateMessage::logIndex).max().getAsLong();
 
             return 
map.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry2 -> {
-                LocalPartitionState state = entry2.getValue();
+                LocalPartitionStateMessage stateMsg = entry2.getValue();
 
-                if (state.state() != LocalPartitionStateEnum.HEALTHY || 
maxLogIndex - state.logIndex() < CATCH_UP_THRESHOLD) {
-                    return state;
+                LocalPartitionStateEnum stateEnum = stateMsg.state();
+
+                if (stateMsg.state() == HEALTHY && maxLogIndex - 
stateMsg.logIndex() >= CATCH_UP_THRESHOLD) {
+                    stateEnum = CATCHING_UP;
                 }
 
-                return MSG_FACTORY.localPartitionState()
-                        .state(LocalPartitionStateEnum.CATCHING_UP)
-                        .partitionId(state.partitionId())
-                        .logIndex(state.logIndex())
-                        .build();
+                CatalogTableDescriptor tableDescriptor = 
catalog.table(tablePartitionId.tableId());
+                return new LocalPartitionState(tableDescriptor.name(), 
tablePartitionId.partitionId(), stateEnum);
             }));
         }));
     }
+
+    private static Map<TablePartitionId, GlobalPartitionState> normalizeGlobal(

Review Comment:
   ```suggestion
       private static Map<TablePartitionId, GlobalPartitionState> 
assembleGlobal(
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -357,36 +402,94 @@ private static LocalPartitionStateEnum convertState(State 
nodeState) {
 
             default:
                 // Unrecognized state, better safe than sorry.
-                return LocalPartitionStateEnum.BROKEN;
+                return BROKEN;
         }
     }
 
     /**
      * Replaces some healthy states with a {@link 
LocalPartitionStateEnum#CATCHING_UP},it can only be done once the state of all 
peers is
      * known.
      */
-    private Map<TablePartitionId, Map<String, LocalPartitionState>> normalize(
-            Map<TablePartitionId, Map<String, LocalPartitionState>> result
+    private static Map<TablePartitionId, Map<String, LocalPartitionState>> 
normalizeLocal(
+            Map<TablePartitionId, Map<String, LocalPartitionStateMessage>> 
result,
+            Catalog catalog
     ) {
         return 
result.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> 
{
-            Map<String, LocalPartitionState> map = entry.getValue();
+            TablePartitionId tablePartitionId = entry.getKey();
+            Map<String, LocalPartitionStateMessage> map = entry.getValue();
 
             // noinspection OptionalGetWithoutIsPresent
-            long maxLogIndex = 
map.values().stream().mapToLong(LocalPartitionState::logIndex).max().getAsLong();
+            long maxLogIndex = 
map.values().stream().mapToLong(LocalPartitionStateMessage::logIndex).max().getAsLong();
 
             return 
map.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry2 -> {
-                LocalPartitionState state = entry2.getValue();
+                LocalPartitionStateMessage stateMsg = entry2.getValue();
 
-                if (state.state() != LocalPartitionStateEnum.HEALTHY || 
maxLogIndex - state.logIndex() < CATCH_UP_THRESHOLD) {
-                    return state;
+                LocalPartitionStateEnum stateEnum = stateMsg.state();
+
+                if (stateMsg.state() == HEALTHY && maxLogIndex - 
stateMsg.logIndex() >= CATCH_UP_THRESHOLD) {
+                    stateEnum = CATCHING_UP;
                 }
 
-                return MSG_FACTORY.localPartitionState()
-                        .state(LocalPartitionStateEnum.CATCHING_UP)
-                        .partitionId(state.partitionId())
-                        .logIndex(state.logIndex())
-                        .build();
+                CatalogTableDescriptor tableDescriptor = 
catalog.table(tablePartitionId.tableId());
+                return new LocalPartitionState(tableDescriptor.name(), 
tablePartitionId.partitionId(), stateEnum);
             }));
         }));
     }
+
+    private static Map<TablePartitionId, GlobalPartitionState> normalizeGlobal(
+            Map<TablePartitionId, Map<String, LocalPartitionState>> 
localResult,
+            Catalog catalog
+    ) {
+        Map<Integer, Integer> tableIdToPartitions = new HashMap<>();
+
+        Map<TablePartitionId, GlobalPartitionState> result = 
localResult.entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, entry -> {
+                    TablePartitionId tablePartitionId = entry.getKey();

Review Comment:
   The following block of code looks pretty scary as a lambda. I would suggest 
to extract it as a method, but filling of `tableIdToPartitions` that is done as 
a side effect of that lambda makes it more difficult. So, I suggest the 
following:
   
   1. Do not fill `tableIdToPartitions` in the same block of code; instead, 
make second iteration over the original map to fill this variable. This 
aggregation will be extremely rare, so we'll not lose much CPU cycles here.
   2. After the lambda does not touch `tableIdToPartitions` anymore, extract it 
to a method like `assembleGlobalStateFromLocal()`
   3. This method and the code for filling `tableIdToPartitions` would reuse 
the same method to get zone descriptor
   
   It seems that after this refactoring it will become easier to understand 
this method: it will become much shorter and less coupled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to