sanpwc commented on code in PR #6288:
URL: https://github.com/apache/ignite-3/pull/6288#discussion_r2222303860


##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalTableStateMessage.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.network.disaster;
+
+import java.util.Map;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
+import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.DisasterRecoveryMessages;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
+import org.apache.ignite.internal.replicator.message.ZonePartitionIdMessage;
+
+/**
+ * A message for reading estimated number of rows for tables stored on this 
node.
+ */
+@Transferable(DisasterRecoveryMessages.LOCAL_TABLE_STATE)
+public interface LocalTableStateMessage extends NetworkMessage {

Review Comment:
   Same as above - LocalTablePartitionStateMessage.



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalPartitionStatesResponse.java:
##########
@@ -23,7 +23,7 @@
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.DisasterRecoveryMessages;
 
 /**
- * Response for {@link LocalPartitionStatesResponse}.
+ * Response for {@link LocalPartitionStatesRequest}.

Review Comment:
   It's response))



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalTableStateRequest.java:
##########
@@ -15,39 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.table.distributed.disaster;
+package org.apache.ignite.internal.partition.replicator.network.disaster;
 
-import org.apache.ignite.internal.tostring.S;
+import java.util.List;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
+import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.DisasterRecoveryMessages;
+import org.apache.ignite.internal.replicator.message.ZonePartitionIdMessage;
 
 /**
- * Table state.
+ * Request for reading table states from the node.
  */
-public class TableState {
+@Transferable(DisasterRecoveryMessages.LOCAL_TABLE_STATE_REQUEST)
+public interface LocalTableStateRequest extends NetworkMessage {
+    List<ZonePartitionIdMessage> zonePartitionIds();
 
-    private final int tableId;
-    private final String schemaName;

Review Comment:
   Previously there was schemaName, why? Why it's removed?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalTableStateResponse.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.network.disaster;
+
+import java.util.List;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
+import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.DisasterRecoveryMessages;
+
+/**
+ * Response for {@link LocalTableStateRequest}.
+ */
+@Transferable(DisasterRecoveryMessages.LOCAL_TABLE_STATE_RESPONSE)
+public interface LocalTableStateResponse extends NetworkMessage {
+    List<LocalTableStateMessage> states();

Review Comment:
   Why do we need list here?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java:
##########
@@ -287,5 +290,14 @@ interface DisasterRecoveryMessages {
 
         /** Message type for {@link LocalPartitionStatesResponse}. */
         short LOCAL_PARTITION_STATE_RESPONSE = 102;
+
+        /** Message type for {@link LocalTableStateMessage}. */
+        short LOCAL_TABLE_STATE = 103;

Review Comment:
   I'd rather name it LOCAL_TABLE_**PARTITION**_STATE, etc.



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalTableStateMessage.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.network.disaster;
+
+import java.util.Map;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
+import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.DisasterRecoveryMessages;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
+import org.apache.ignite.internal.replicator.message.ZonePartitionIdMessage;
+
+/**
+ * A message for reading estimated number of rows for tables stored on this 
node.
+ */
+@Transferable(DisasterRecoveryMessages.LOCAL_TABLE_STATE)
+public interface LocalTableStateMessage extends NetworkMessage {
+
+    /** Zone Partition ID. */
+    ZonePartitionIdMessage zonePartitionId();
+
+    /**
+     * Returns estimated number of rows for partitions stored on this node.
+     *
+     * <p>When colocation is enabled, and we want to get estimated rows for 
each of the tables, this method should be used.

Review Comment:
   The javadoc is a bit messy.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -845,16 +856,110 @@ public CompletableFuture<Map<TablePartitionId, 
GlobalTablePartitionState>> globa
         }
     }
 
+    /**
+     * Converts {@link LocalPartitionStateMessageByNode} to a mapping of zone 
names to the set of zone partitions.
+     *
+     * @param partitionStateMap Partition state map.
+     * @return Mapping of zone names to the set of zone partitions.
+     */
+    private static Map<String, Set<ZonePartitionId>> toZonesOnNodes(
+            Map<ZonePartitionId, LocalPartitionStateMessageByNode> 
partitionStateMap
+    ) {
+        Map<String, Set<ZonePartitionId>> res = new HashMap<>();
+
+        for (Map.Entry<ZonePartitionId, LocalPartitionStateMessageByNode> 
entry : partitionStateMap.entrySet()) {
+            ZonePartitionId zonePartitionId = entry.getKey();
+
+            LocalPartitionStateMessageByNode 
zoneLocalPartitionStateMessageByNode = entry.getValue();
+
+            for (String nodeName : 
zoneLocalPartitionStateMessageByNode.nodes()) {
+                res.computeIfAbsent(nodeName, k -> new 
HashSet<>()).add(zonePartitionId);
+            }
+        }
+
+        return res;
+    }
+
+    /**
+     * Returns estimated number of rows for each table having a partition in 
the specified zones.
+     *
+     * <p>The result is returned from the nodes specified in the {@code 
zonesOnNodes.keySet()} -
+     * these are the nodes we previously received partition states from.
+     *
+     * @param zonesOnNodes Mapping of node names to the set of zone partitions.
+     * @param catalogVersion Catalog version.
+     * @return Future with the mapping.
+     */
+    private CompletableFuture<Map<String, Map<ZonePartitionId, 
Map<TablePartitionIdMessage, Long>>>> tableStateForZone(

Review Comment:
   Why it's so complicated Map<Map,Map> ? All we need is TablePartitonId to 
estimated size, right?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalTableStateMessage.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.network.disaster;
+
+import java.util.Map;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
+import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.DisasterRecoveryMessages;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
+import org.apache.ignite.internal.replicator.message.ZonePartitionIdMessage;
+
+/**
+ * A message for reading estimated number of rows for tables stored on this 
node.
+ */
+@Transferable(DisasterRecoveryMessages.LOCAL_TABLE_STATE)
+public interface LocalTableStateMessage extends NetworkMessage {
+
+    /** Zone Partition ID. */
+    ZonePartitionIdMessage zonePartitionId();

Review Comment:
   Why do we need zoneId here?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalTableStateMessage.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.network.disaster;
+
+import java.util.Map;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
+import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.DisasterRecoveryMessages;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
+import org.apache.ignite.internal.replicator.message.ZonePartitionIdMessage;
+
+/**
+ * A message for reading estimated number of rows for tables stored on this 
node.
+ */
+@Transferable(DisasterRecoveryMessages.LOCAL_TABLE_STATE)
+public interface LocalTableStateMessage extends NetworkMessage {
+

Review Comment:
   Unnecessary empty line.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -845,16 +856,110 @@ public CompletableFuture<Map<TablePartitionId, 
GlobalTablePartitionState>> globa
         }
     }
 
+    /**
+     * Converts {@link LocalPartitionStateMessageByNode} to a mapping of zone 
names to the set of zone partitions.
+     *
+     * @param partitionStateMap Partition state map.
+     * @return Mapping of zone names to the set of zone partitions.
+     */
+    private static Map<String, Set<ZonePartitionId>> toZonesOnNodes(
+            Map<ZonePartitionId, LocalPartitionStateMessageByNode> 
partitionStateMap
+    ) {
+        Map<String, Set<ZonePartitionId>> res = new HashMap<>();
+
+        for (Map.Entry<ZonePartitionId, LocalPartitionStateMessageByNode> 
entry : partitionStateMap.entrySet()) {
+            ZonePartitionId zonePartitionId = entry.getKey();
+
+            LocalPartitionStateMessageByNode 
zoneLocalPartitionStateMessageByNode = entry.getValue();
+
+            for (String nodeName : 
zoneLocalPartitionStateMessageByNode.nodes()) {
+                res.computeIfAbsent(nodeName, k -> new 
HashSet<>()).add(zonePartitionId);
+            }
+        }
+
+        return res;
+    }
+
+    /**
+     * Returns estimated number of rows for each table having a partition in 
the specified zones.
+     *
+     * <p>The result is returned from the nodes specified in the {@code 
zonesOnNodes.keySet()} -
+     * these are the nodes we previously received partition states from.
+     *
+     * @param zonesOnNodes Mapping of node names to the set of zone partitions.
+     * @param catalogVersion Catalog version.
+     * @return Future with the mapping.
+     */
+    private CompletableFuture<Map<String, Map<ZonePartitionId, 
Map<TablePartitionIdMessage, Long>>>> tableStateForZone(
+            Map<String, Set<ZonePartitionId>> zonesOnNodes,
+            int catalogVersion
+    ) {
+        Map<String, Map<ZonePartitionId, Map<TablePartitionIdMessage, Long>>> 
result = new ConcurrentHashMap<>();
+
+        CompletableFuture<?>[] futures = zonesOnNodes.entrySet().stream()
+                .map(entry ->
+                        tableStateForZoneOnNode(catalogVersion, 
entry.getKey(), entry.getValue())
+                                .thenAccept(response ->
+                                        response.states().forEach(state -> {
+                                            ZonePartitionId zonePartitionId = 
state.zonePartitionId().asZonePartitionId();
+
+                                            
result.computeIfAbsent(entry.getKey(), k -> new ConcurrentHashMap<>())

Review Comment:
   Why it's ConcurrentHashMap?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalTableStateRequest.java:
##########
@@ -15,39 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.table.distributed.disaster;
+package org.apache.ignite.internal.partition.replicator.network.disaster;
 
-import org.apache.ignite.internal.tostring.S;
+import java.util.List;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
+import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.DisasterRecoveryMessages;
+import org.apache.ignite.internal.replicator.message.ZonePartitionIdMessage;
 
 /**
- * Table state.
+ * Request for reading table states from the node.
  */
-public class TableState {
+@Transferable(DisasterRecoveryMessages.LOCAL_TABLE_STATE_REQUEST)
+public interface LocalTableStateRequest extends NetworkMessage {
+    List<ZonePartitionIdMessage> zonePartitionIds();

Review Comment:
   Is it important for zonePartitionIds to be ordered? Or in other words, why 
it's list?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java:
##########
@@ -1100,9 +1203,45 @@ private void handleTriggerKeyUpdate(WatchEvent 
watchEvent) {
     private void handleMessage(NetworkMessage message, ClusterNode sender, 
@Nullable Long correlationId) {
         if (message instanceof LocalPartitionStatesRequest) {
             handleLocalPartitionStatesRequest((LocalPartitionStatesRequest) 
message, sender, correlationId);
+        } else if (message instanceof LocalTableStateRequest) {
+            handleLocalTableStateRequest((LocalTableStateRequest) message, 
sender, correlationId);
         }
     }
 
+    private void handleLocalTableStateRequest(LocalTableStateRequest request, 
ClusterNode sender, @Nullable Long correlationId) {
+        assert correlationId != null : "request=" + request + ", sender=" + 
sender;
+
+        int catalogVersion = request.catalogVersion();
+
+        Set<ZonePartitionId> requesedPartitions = 
request.zonePartitionIds().stream()
+                .map(ZonePartitionIdMessage::asZonePartitionId)
+                .collect(toSet());
+
+        catalogManager.catalogReadyFuture(catalogVersion).thenRunAsync(() -> {
+            List<LocalTableStateMessage> statesList = new ArrayList<>();
+
+            raftManager.forEach((raftNodeId, raftGroupService) -> {
+                if (raftNodeId.groupId() instanceof ZonePartitionId) {
+
+                    LocalTableStateMessage message = handleSizeRequestForTable(

Review Comment:
   It's messy, we request size for table as method states, but propagate 
zonePartitionsIds as parameter.



##########
modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java:
##########
@@ -268,6 +270,86 @@ void testRestartZonePartitions() {
         assertThat(selectAll(), hasSize(4));
     }
 
+    @WithSystemProperty(key = IgniteSystemProperties.COLOCATION_FEATURE_FLAG, 
value = "false")
+    @Test
+    @ZoneParams(nodes = 2, replicas = 1, partitions = 2)

Review Comment:
   Should we also check it with replicas > 1?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to