sanpwc commented on code in PR #6288: URL: https://github.com/apache/ignite-3/pull/6288#discussion_r2222303860
########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalTableStateMessage.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator.network.disaster; + +import java.util.Map; +import org.apache.ignite.internal.network.NetworkMessage; +import org.apache.ignite.internal.network.annotations.Transferable; +import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.DisasterRecoveryMessages; +import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage; +import org.apache.ignite.internal.replicator.message.ZonePartitionIdMessage; + +/** + * A message for reading estimated number of rows for tables stored on this node. + */ +@Transferable(DisasterRecoveryMessages.LOCAL_TABLE_STATE) +public interface LocalTableStateMessage extends NetworkMessage { Review Comment: Same as above - LocalTablePartitionStateMessage. ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalPartitionStatesResponse.java: ########## @@ -23,7 +23,7 @@ import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.DisasterRecoveryMessages; /** - * Response for {@link LocalPartitionStatesResponse}. + * Response for {@link LocalPartitionStatesRequest}. Review Comment: It's response)) ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalTableStateRequest.java: ########## @@ -15,39 +15,20 @@ * limitations under the License. */ -package org.apache.ignite.internal.table.distributed.disaster; +package org.apache.ignite.internal.partition.replicator.network.disaster; -import org.apache.ignite.internal.tostring.S; +import java.util.List; +import org.apache.ignite.internal.network.NetworkMessage; +import org.apache.ignite.internal.network.annotations.Transferable; +import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.DisasterRecoveryMessages; +import org.apache.ignite.internal.replicator.message.ZonePartitionIdMessage; /** - * Table state. + * Request for reading table states from the node. */ -public class TableState { +@Transferable(DisasterRecoveryMessages.LOCAL_TABLE_STATE_REQUEST) +public interface LocalTableStateRequest extends NetworkMessage { + List<ZonePartitionIdMessage> zonePartitionIds(); - private final int tableId; - private final String schemaName; Review Comment: Previously there was schemaName, why? Why it's removed? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalTableStateResponse.java: ########## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator.network.disaster; + +import java.util.List; +import org.apache.ignite.internal.network.NetworkMessage; +import org.apache.ignite.internal.network.annotations.Transferable; +import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.DisasterRecoveryMessages; + +/** + * Response for {@link LocalTableStateRequest}. + */ +@Transferable(DisasterRecoveryMessages.LOCAL_TABLE_STATE_RESPONSE) +public interface LocalTableStateResponse extends NetworkMessage { + List<LocalTableStateMessage> states(); Review Comment: Why do we need list here? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/PartitionReplicationMessageGroup.java: ########## @@ -287,5 +290,14 @@ interface DisasterRecoveryMessages { /** Message type for {@link LocalPartitionStatesResponse}. */ short LOCAL_PARTITION_STATE_RESPONSE = 102; + + /** Message type for {@link LocalTableStateMessage}. */ + short LOCAL_TABLE_STATE = 103; Review Comment: I'd rather name it LOCAL_TABLE_**PARTITION**_STATE, etc. ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalTableStateMessage.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator.network.disaster; + +import java.util.Map; +import org.apache.ignite.internal.network.NetworkMessage; +import org.apache.ignite.internal.network.annotations.Transferable; +import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.DisasterRecoveryMessages; +import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage; +import org.apache.ignite.internal.replicator.message.ZonePartitionIdMessage; + +/** + * A message for reading estimated number of rows for tables stored on this node. + */ +@Transferable(DisasterRecoveryMessages.LOCAL_TABLE_STATE) +public interface LocalTableStateMessage extends NetworkMessage { + + /** Zone Partition ID. */ + ZonePartitionIdMessage zonePartitionId(); + + /** + * Returns estimated number of rows for partitions stored on this node. + * + * <p>When colocation is enabled, and we want to get estimated rows for each of the tables, this method should be used. Review Comment: The javadoc is a bit messy. ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java: ########## @@ -845,16 +856,110 @@ public CompletableFuture<Map<TablePartitionId, GlobalTablePartitionState>> globa } } + /** + * Converts {@link LocalPartitionStateMessageByNode} to a mapping of zone names to the set of zone partitions. + * + * @param partitionStateMap Partition state map. + * @return Mapping of zone names to the set of zone partitions. + */ + private static Map<String, Set<ZonePartitionId>> toZonesOnNodes( + Map<ZonePartitionId, LocalPartitionStateMessageByNode> partitionStateMap + ) { + Map<String, Set<ZonePartitionId>> res = new HashMap<>(); + + for (Map.Entry<ZonePartitionId, LocalPartitionStateMessageByNode> entry : partitionStateMap.entrySet()) { + ZonePartitionId zonePartitionId = entry.getKey(); + + LocalPartitionStateMessageByNode zoneLocalPartitionStateMessageByNode = entry.getValue(); + + for (String nodeName : zoneLocalPartitionStateMessageByNode.nodes()) { + res.computeIfAbsent(nodeName, k -> new HashSet<>()).add(zonePartitionId); + } + } + + return res; + } + + /** + * Returns estimated number of rows for each table having a partition in the specified zones. + * + * <p>The result is returned from the nodes specified in the {@code zonesOnNodes.keySet()} - + * these are the nodes we previously received partition states from. + * + * @param zonesOnNodes Mapping of node names to the set of zone partitions. + * @param catalogVersion Catalog version. + * @return Future with the mapping. + */ + private CompletableFuture<Map<String, Map<ZonePartitionId, Map<TablePartitionIdMessage, Long>>>> tableStateForZone( Review Comment: Why it's so complicated Map<Map,Map> ? All we need is TablePartitonId to estimated size, right? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalTableStateMessage.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator.network.disaster; + +import java.util.Map; +import org.apache.ignite.internal.network.NetworkMessage; +import org.apache.ignite.internal.network.annotations.Transferable; +import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.DisasterRecoveryMessages; +import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage; +import org.apache.ignite.internal.replicator.message.ZonePartitionIdMessage; + +/** + * A message for reading estimated number of rows for tables stored on this node. + */ +@Transferable(DisasterRecoveryMessages.LOCAL_TABLE_STATE) +public interface LocalTableStateMessage extends NetworkMessage { + + /** Zone Partition ID. */ + ZonePartitionIdMessage zonePartitionId(); Review Comment: Why do we need zoneId here? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalTableStateMessage.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.partition.replicator.network.disaster; + +import java.util.Map; +import org.apache.ignite.internal.network.NetworkMessage; +import org.apache.ignite.internal.network.annotations.Transferable; +import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.DisasterRecoveryMessages; +import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage; +import org.apache.ignite.internal.replicator.message.ZonePartitionIdMessage; + +/** + * A message for reading estimated number of rows for tables stored on this node. + */ +@Transferable(DisasterRecoveryMessages.LOCAL_TABLE_STATE) +public interface LocalTableStateMessage extends NetworkMessage { + Review Comment: Unnecessary empty line. ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java: ########## @@ -845,16 +856,110 @@ public CompletableFuture<Map<TablePartitionId, GlobalTablePartitionState>> globa } } + /** + * Converts {@link LocalPartitionStateMessageByNode} to a mapping of zone names to the set of zone partitions. + * + * @param partitionStateMap Partition state map. + * @return Mapping of zone names to the set of zone partitions. + */ + private static Map<String, Set<ZonePartitionId>> toZonesOnNodes( + Map<ZonePartitionId, LocalPartitionStateMessageByNode> partitionStateMap + ) { + Map<String, Set<ZonePartitionId>> res = new HashMap<>(); + + for (Map.Entry<ZonePartitionId, LocalPartitionStateMessageByNode> entry : partitionStateMap.entrySet()) { + ZonePartitionId zonePartitionId = entry.getKey(); + + LocalPartitionStateMessageByNode zoneLocalPartitionStateMessageByNode = entry.getValue(); + + for (String nodeName : zoneLocalPartitionStateMessageByNode.nodes()) { + res.computeIfAbsent(nodeName, k -> new HashSet<>()).add(zonePartitionId); + } + } + + return res; + } + + /** + * Returns estimated number of rows for each table having a partition in the specified zones. + * + * <p>The result is returned from the nodes specified in the {@code zonesOnNodes.keySet()} - + * these are the nodes we previously received partition states from. + * + * @param zonesOnNodes Mapping of node names to the set of zone partitions. + * @param catalogVersion Catalog version. + * @return Future with the mapping. + */ + private CompletableFuture<Map<String, Map<ZonePartitionId, Map<TablePartitionIdMessage, Long>>>> tableStateForZone( + Map<String, Set<ZonePartitionId>> zonesOnNodes, + int catalogVersion + ) { + Map<String, Map<ZonePartitionId, Map<TablePartitionIdMessage, Long>>> result = new ConcurrentHashMap<>(); + + CompletableFuture<?>[] futures = zonesOnNodes.entrySet().stream() + .map(entry -> + tableStateForZoneOnNode(catalogVersion, entry.getKey(), entry.getValue()) + .thenAccept(response -> + response.states().forEach(state -> { + ZonePartitionId zonePartitionId = state.zonePartitionId().asZonePartitionId(); + + result.computeIfAbsent(entry.getKey(), k -> new ConcurrentHashMap<>()) Review Comment: Why it's ConcurrentHashMap? ########## modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/disaster/LocalTableStateRequest.java: ########## @@ -15,39 +15,20 @@ * limitations under the License. */ -package org.apache.ignite.internal.table.distributed.disaster; +package org.apache.ignite.internal.partition.replicator.network.disaster; -import org.apache.ignite.internal.tostring.S; +import java.util.List; +import org.apache.ignite.internal.network.NetworkMessage; +import org.apache.ignite.internal.network.annotations.Transferable; +import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup.DisasterRecoveryMessages; +import org.apache.ignite.internal.replicator.message.ZonePartitionIdMessage; /** - * Table state. + * Request for reading table states from the node. */ -public class TableState { +@Transferable(DisasterRecoveryMessages.LOCAL_TABLE_STATE_REQUEST) +public interface LocalTableStateRequest extends NetworkMessage { + List<ZonePartitionIdMessage> zonePartitionIds(); Review Comment: Is it important for zonePartitionIds to be ordered? Or in other words, why it's list? ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java: ########## @@ -1100,9 +1203,45 @@ private void handleTriggerKeyUpdate(WatchEvent watchEvent) { private void handleMessage(NetworkMessage message, ClusterNode sender, @Nullable Long correlationId) { if (message instanceof LocalPartitionStatesRequest) { handleLocalPartitionStatesRequest((LocalPartitionStatesRequest) message, sender, correlationId); + } else if (message instanceof LocalTableStateRequest) { + handleLocalTableStateRequest((LocalTableStateRequest) message, sender, correlationId); } } + private void handleLocalTableStateRequest(LocalTableStateRequest request, ClusterNode sender, @Nullable Long correlationId) { + assert correlationId != null : "request=" + request + ", sender=" + sender; + + int catalogVersion = request.catalogVersion(); + + Set<ZonePartitionId> requesedPartitions = request.zonePartitionIds().stream() + .map(ZonePartitionIdMessage::asZonePartitionId) + .collect(toSet()); + + catalogManager.catalogReadyFuture(catalogVersion).thenRunAsync(() -> { + List<LocalTableStateMessage> statesList = new ArrayList<>(); + + raftManager.forEach((raftNodeId, raftGroupService) -> { + if (raftNodeId.groupId() instanceof ZonePartitionId) { + + LocalTableStateMessage message = handleSizeRequestForTable( Review Comment: It's messy, we request size for table as method states, but propagate zonePartitionsIds as parameter. ########## modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java: ########## @@ -268,6 +270,86 @@ void testRestartZonePartitions() { assertThat(selectAll(), hasSize(4)); } + @WithSystemProperty(key = IgniteSystemProperties.COLOCATION_FEATURE_FLAG, value = "false") + @Test + @ZoneParams(nodes = 2, replicas = 1, partitions = 2) Review Comment: Should we also check it with replicas > 1? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org