tkalkirill commented on code in PR #3379: URL: https://github.com/apache/ignite-3/pull/3379#discussion_r1543010825
########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/LocalPartitionStateEnum.java: ########## @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.disaster; + +/** + * Enum for states of local partitions. + */ +public enum LocalPartitionStateEnum { Review Comment: Add a description to each element. ########## modules/raft/src/main/java/org/apache/ignite/raft/jraft/Node.java: ########## @@ -317,4 +317,14 @@ public interface Node extends Lifecycle<NodeOptions>, Describer { * @return node's current term. */ long getCurrentTerm(); + + /** + * Returns {@code true} if node is currently in the process of installing a snapshot. + */ + boolean isInstallingSnapshot(); + + /** + * Returns the value of last replicated log index. Corresponding log entry might not yet be written to the log storage (no flush). + */ + long lastLogIndex(); Review Comment: Maybe rename to `lastReplicatedLogIndex` ? ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java: ########## @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.disaster; + +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.ignite.internal.affinity.Assignments; +import org.apache.ignite.internal.catalog.CatalogManager; +import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; +import org.apache.ignite.internal.distributionzones.DistributionZoneManager; +import org.apache.ignite.internal.distributionzones.NodeWithAttributes; +import org.apache.ignite.internal.distributionzones.exception.DistributionZoneNotFoundException; +import org.apache.ignite.internal.lang.ByteArray; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.manager.IgniteComponent; +import org.apache.ignite.internal.metastorage.Entry; +import org.apache.ignite.internal.metastorage.MetaStorageManager; +import org.apache.ignite.internal.metastorage.WatchEvent; +import org.apache.ignite.internal.metastorage.WatchListener; +import org.apache.ignite.internal.network.MessagingService; +import org.apache.ignite.internal.network.NetworkMessage; +import org.apache.ignite.internal.raft.Loza; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.table.distributed.TableMessageGroup; +import org.apache.ignite.internal.table.distributed.TableMessagesFactory; +import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionState; +import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStatesRequest; +import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStatesResponse; +import org.apache.ignite.internal.util.ByteUtils; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.raft.jraft.Node; +import org.apache.ignite.raft.jraft.core.State; +import org.jetbrains.annotations.Nullable; + +/** + * Manager, responsible for "disaster recovery" operations. + * Internally it triggers meta-storage updates, in order to acquire unique causality token. + * As a reaction to these updates, manager performs actual recovery operations, such as {@link #resetPartitions(int, int)}. + * More details are in the <a href="https://issues.apache.org/jira/browse/IGNITE-21140">epic</a>. + */ +public class DisasterRecoveryManager implements IgniteComponent { + /** Logger. */ + private static final IgniteLogger LOG = Loggers.forClass(DisasterRecoveryManager.class); + + /** Single key for writing disaster recovery requests into meta-storage. */ + static final ByteArray RECOVERY_TRIGGER_KEY = new ByteArray("disaster.recovery.trigger"); + + private static final TableMessagesFactory MSG_FACTORY = new TableMessagesFactory(); + + private static final int TIMEOUT = 30; + + private static final int CATCH_UP_THRESHOLD = 10; + + /** Thread pool executor for async parts. */ + private final ExecutorService threadPool; + + /** Messaging service. */ + private final MessagingService messagingService; + + /** Meta-storage manager. */ + final MetaStorageManager metaStorageManager; + + /** Catalog manager. */ + final CatalogManager catalogManager; + + /** Distribution zone manager. */ + final DistributionZoneManager dzManager; + + /** Raft manager. */ + private final Loza raftManager; + + /** Watch listener for {@link #RECOVERY_TRIGGER_KEY}. */ + private final WatchListener watchListener; + + /** + * Map of operations, triggered by local node, that have not yet been processed by {@link #watchListener}. Values in the map are the + * futures, returned from the {@link #processNewRequest(ManualGroupUpdateRequest)}, they are completed by + * {@link #handleTriggerKeyUpdate(WatchEvent)} when node receives corresponding events from the metastorage (or if it doesn't receive + * this event within a 30 seconds window). + */ + private final Map<UUID, CompletableFuture<Void>> ongoingOperationsById = new ConcurrentHashMap<>(); + + /** + * Constructor. + */ + public DisasterRecoveryManager( + ExecutorService threadPool, + MessagingService messagingService, + MetaStorageManager metaStorageManager, + CatalogManager catalogManager, + DistributionZoneManager dzManager, + Loza raftManager + ) { + this.threadPool = threadPool; + this.messagingService = messagingService; + this.metaStorageManager = metaStorageManager; + this.catalogManager = catalogManager; + this.dzManager = dzManager; + this.raftManager = raftManager; + + watchListener = new WatchListener() { + @Override + public CompletableFuture<Void> onUpdate(WatchEvent event) { + return handleTriggerKeyUpdate(event); + } + + @Override + public void onError(Throwable e) { + // No-op. + } + }; + + messagingService.addMessageHandler(TableMessageGroup.class, this::handleMessage); + } + + @Override + public CompletableFuture<Void> start() { + metaStorageManager.registerExactWatch(RECOVERY_TRIGGER_KEY, watchListener); + + return nullCompletedFuture(); + } + + @Override + public void stop() throws Exception { + metaStorageManager.unregisterWatch(watchListener); + } + + /** + * Updates assignments of the table in a forced manner, allowing for the recovery of raft group with lost majorities. It is achieved via + * triggering a new rebalance with {@code force} flag enabled in {@link Assignments} for partitions where it's required. New pending + * assignments with {@code force} flag remove old stable nodes from the distribution, and force new Raft configuration via "resetPeers" + * so that a new leader could be elected. + * + * @param zoneId Distribution zone ID. + * @param tableId Table ID. + * @return Operation future. + */ + public CompletableFuture<Void> resetPartitions(int zoneId, int tableId) { + return processNewRequest(new ManualGroupUpdateRequest(UUID.randomUUID(), zoneId, tableId)); + } + + /** + * Returns partition states for all zones' partitions in the cluster. Result is a mapping of {@link TablePartitionId} to the mapping + * between a node name and a partition state. + * + * @param zoneName Zone name. + * @return Future with the mapping. + */ + public CompletableFuture<Map<TablePartitionId, Map<String, LocalPartitionState>>> partitionStates(String zoneName) { + int latestCatalogVersion = catalogManager.latestCatalogVersion(); + Optional<CatalogZoneDescriptor> zoneDesciptorOptional = catalogManager.zones(latestCatalogVersion).stream() + .filter(catalogZoneDescriptor -> catalogZoneDescriptor.name().equals(zoneName)) + .findAny(); + + if (zoneDesciptorOptional.isEmpty()) { + return CompletableFuture.failedFuture(new DistributionZoneNotFoundException(zoneName, null)); + } + + CatalogZoneDescriptor zoneDescriptor = zoneDesciptorOptional.get(); + + Set<NodeWithAttributes> logicalTopology = dzManager.logicalTopology(); + + LocalPartitionStatesRequest localPartitionStatesRequest = MSG_FACTORY.localPartitionStatesRequest() + .zoneId(zoneDescriptor.id()) + .catalogVersion(latestCatalogVersion) + .build(); + + Map<TablePartitionId, Map<String, LocalPartitionState>> result = new ConcurrentHashMap<>(); + List<CompletableFuture<?>> futures = new ArrayList<>(); Review Comment: It would be possible to use an array right away, Meow. ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java: ########## @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.disaster; + +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.ignite.internal.affinity.Assignments; +import org.apache.ignite.internal.catalog.CatalogManager; +import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; +import org.apache.ignite.internal.distributionzones.DistributionZoneManager; +import org.apache.ignite.internal.distributionzones.NodeWithAttributes; +import org.apache.ignite.internal.distributionzones.exception.DistributionZoneNotFoundException; +import org.apache.ignite.internal.lang.ByteArray; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.manager.IgniteComponent; +import org.apache.ignite.internal.metastorage.Entry; +import org.apache.ignite.internal.metastorage.MetaStorageManager; +import org.apache.ignite.internal.metastorage.WatchEvent; +import org.apache.ignite.internal.metastorage.WatchListener; +import org.apache.ignite.internal.network.MessagingService; +import org.apache.ignite.internal.network.NetworkMessage; +import org.apache.ignite.internal.raft.Loza; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.table.distributed.TableMessageGroup; +import org.apache.ignite.internal.table.distributed.TableMessagesFactory; +import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionState; +import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStatesRequest; +import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStatesResponse; +import org.apache.ignite.internal.util.ByteUtils; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.raft.jraft.Node; +import org.apache.ignite.raft.jraft.core.State; +import org.jetbrains.annotations.Nullable; + +/** + * Manager, responsible for "disaster recovery" operations. + * Internally it triggers meta-storage updates, in order to acquire unique causality token. + * As a reaction to these updates, manager performs actual recovery operations, such as {@link #resetPartitions(int, int)}. + * More details are in the <a href="https://issues.apache.org/jira/browse/IGNITE-21140">epic</a>. + */ +public class DisasterRecoveryManager implements IgniteComponent { + /** Logger. */ + private static final IgniteLogger LOG = Loggers.forClass(DisasterRecoveryManager.class); + + /** Single key for writing disaster recovery requests into meta-storage. */ + static final ByteArray RECOVERY_TRIGGER_KEY = new ByteArray("disaster.recovery.trigger"); + + private static final TableMessagesFactory MSG_FACTORY = new TableMessagesFactory(); + + private static final int TIMEOUT = 30; + + private static final int CATCH_UP_THRESHOLD = 10; + + /** Thread pool executor for async parts. */ + private final ExecutorService threadPool; + + /** Messaging service. */ + private final MessagingService messagingService; + + /** Meta-storage manager. */ + final MetaStorageManager metaStorageManager; + + /** Catalog manager. */ + final CatalogManager catalogManager; + + /** Distribution zone manager. */ + final DistributionZoneManager dzManager; + + /** Raft manager. */ + private final Loza raftManager; + + /** Watch listener for {@link #RECOVERY_TRIGGER_KEY}. */ + private final WatchListener watchListener; + + /** + * Map of operations, triggered by local node, that have not yet been processed by {@link #watchListener}. Values in the map are the + * futures, returned from the {@link #processNewRequest(ManualGroupUpdateRequest)}, they are completed by + * {@link #handleTriggerKeyUpdate(WatchEvent)} when node receives corresponding events from the metastorage (or if it doesn't receive + * this event within a 30 seconds window). + */ + private final Map<UUID, CompletableFuture<Void>> ongoingOperationsById = new ConcurrentHashMap<>(); + + /** + * Constructor. + */ + public DisasterRecoveryManager( + ExecutorService threadPool, + MessagingService messagingService, + MetaStorageManager metaStorageManager, + CatalogManager catalogManager, + DistributionZoneManager dzManager, + Loza raftManager + ) { + this.threadPool = threadPool; + this.messagingService = messagingService; + this.metaStorageManager = metaStorageManager; + this.catalogManager = catalogManager; + this.dzManager = dzManager; + this.raftManager = raftManager; + + watchListener = new WatchListener() { + @Override + public CompletableFuture<Void> onUpdate(WatchEvent event) { + return handleTriggerKeyUpdate(event); + } + + @Override + public void onError(Throwable e) { + // No-op. + } + }; + + messagingService.addMessageHandler(TableMessageGroup.class, this::handleMessage); + } + + @Override + public CompletableFuture<Void> start() { + metaStorageManager.registerExactWatch(RECOVERY_TRIGGER_KEY, watchListener); + + return nullCompletedFuture(); + } + + @Override + public void stop() throws Exception { + metaStorageManager.unregisterWatch(watchListener); + } + + /** + * Updates assignments of the table in a forced manner, allowing for the recovery of raft group with lost majorities. It is achieved via + * triggering a new rebalance with {@code force} flag enabled in {@link Assignments} for partitions where it's required. New pending + * assignments with {@code force} flag remove old stable nodes from the distribution, and force new Raft configuration via "resetPeers" + * so that a new leader could be elected. + * + * @param zoneId Distribution zone ID. + * @param tableId Table ID. + * @return Operation future. + */ + public CompletableFuture<Void> resetPartitions(int zoneId, int tableId) { + return processNewRequest(new ManualGroupUpdateRequest(UUID.randomUUID(), zoneId, tableId)); + } + + /** + * Returns partition states for all zones' partitions in the cluster. Result is a mapping of {@link TablePartitionId} to the mapping + * between a node name and a partition state. + * + * @param zoneName Zone name. + * @return Future with the mapping. + */ + public CompletableFuture<Map<TablePartitionId, Map<String, LocalPartitionState>>> partitionStates(String zoneName) { Review Comment: Why zoneName not zoneId ? ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java: ########## @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.disaster; + +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.ignite.internal.affinity.Assignments; +import org.apache.ignite.internal.catalog.CatalogManager; +import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; +import org.apache.ignite.internal.distributionzones.DistributionZoneManager; +import org.apache.ignite.internal.distributionzones.NodeWithAttributes; +import org.apache.ignite.internal.distributionzones.exception.DistributionZoneNotFoundException; +import org.apache.ignite.internal.lang.ByteArray; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.manager.IgniteComponent; +import org.apache.ignite.internal.metastorage.Entry; +import org.apache.ignite.internal.metastorage.MetaStorageManager; +import org.apache.ignite.internal.metastorage.WatchEvent; +import org.apache.ignite.internal.metastorage.WatchListener; +import org.apache.ignite.internal.network.MessagingService; +import org.apache.ignite.internal.network.NetworkMessage; +import org.apache.ignite.internal.raft.Loza; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.table.distributed.TableMessageGroup; +import org.apache.ignite.internal.table.distributed.TableMessagesFactory; +import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionState; +import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStatesRequest; +import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStatesResponse; +import org.apache.ignite.internal.util.ByteUtils; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.raft.jraft.Node; +import org.apache.ignite.raft.jraft.core.State; +import org.jetbrains.annotations.Nullable; + +/** + * Manager, responsible for "disaster recovery" operations. + * Internally it triggers meta-storage updates, in order to acquire unique causality token. + * As a reaction to these updates, manager performs actual recovery operations, such as {@link #resetPartitions(int, int)}. + * More details are in the <a href="https://issues.apache.org/jira/browse/IGNITE-21140">epic</a>. + */ +public class DisasterRecoveryManager implements IgniteComponent { + /** Logger. */ + private static final IgniteLogger LOG = Loggers.forClass(DisasterRecoveryManager.class); + + /** Single key for writing disaster recovery requests into meta-storage. */ + static final ByteArray RECOVERY_TRIGGER_KEY = new ByteArray("disaster.recovery.trigger"); + + private static final TableMessagesFactory MSG_FACTORY = new TableMessagesFactory(); + + private static final int TIMEOUT = 30; + + private static final int CATCH_UP_THRESHOLD = 10; + + /** Thread pool executor for async parts. */ + private final ExecutorService threadPool; + + /** Messaging service. */ + private final MessagingService messagingService; + + /** Meta-storage manager. */ + final MetaStorageManager metaStorageManager; + + /** Catalog manager. */ + final CatalogManager catalogManager; + + /** Distribution zone manager. */ + final DistributionZoneManager dzManager; + + /** Raft manager. */ + private final Loza raftManager; + + /** Watch listener for {@link #RECOVERY_TRIGGER_KEY}. */ + private final WatchListener watchListener; + + /** + * Map of operations, triggered by local node, that have not yet been processed by {@link #watchListener}. Values in the map are the + * futures, returned from the {@link #processNewRequest(ManualGroupUpdateRequest)}, they are completed by + * {@link #handleTriggerKeyUpdate(WatchEvent)} when node receives corresponding events from the metastorage (or if it doesn't receive + * this event within a 30 seconds window). + */ + private final Map<UUID, CompletableFuture<Void>> ongoingOperationsById = new ConcurrentHashMap<>(); + + /** + * Constructor. + */ + public DisasterRecoveryManager( + ExecutorService threadPool, + MessagingService messagingService, + MetaStorageManager metaStorageManager, + CatalogManager catalogManager, + DistributionZoneManager dzManager, + Loza raftManager + ) { + this.threadPool = threadPool; + this.messagingService = messagingService; + this.metaStorageManager = metaStorageManager; + this.catalogManager = catalogManager; + this.dzManager = dzManager; + this.raftManager = raftManager; + + watchListener = new WatchListener() { + @Override + public CompletableFuture<Void> onUpdate(WatchEvent event) { + return handleTriggerKeyUpdate(event); + } + + @Override + public void onError(Throwable e) { + // No-op. + } + }; + + messagingService.addMessageHandler(TableMessageGroup.class, this::handleMessage); Review Comment: Shouldn't it be in `#start`? ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java: ########## @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.disaster; + +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.ignite.internal.affinity.Assignments; +import org.apache.ignite.internal.catalog.CatalogManager; +import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; +import org.apache.ignite.internal.distributionzones.DistributionZoneManager; +import org.apache.ignite.internal.distributionzones.NodeWithAttributes; +import org.apache.ignite.internal.distributionzones.exception.DistributionZoneNotFoundException; +import org.apache.ignite.internal.lang.ByteArray; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.manager.IgniteComponent; +import org.apache.ignite.internal.metastorage.Entry; +import org.apache.ignite.internal.metastorage.MetaStorageManager; +import org.apache.ignite.internal.metastorage.WatchEvent; +import org.apache.ignite.internal.metastorage.WatchListener; +import org.apache.ignite.internal.network.MessagingService; +import org.apache.ignite.internal.network.NetworkMessage; +import org.apache.ignite.internal.raft.Loza; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.table.distributed.TableMessageGroup; +import org.apache.ignite.internal.table.distributed.TableMessagesFactory; +import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionState; +import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStatesRequest; +import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStatesResponse; +import org.apache.ignite.internal.util.ByteUtils; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.raft.jraft.Node; +import org.apache.ignite.raft.jraft.core.State; +import org.jetbrains.annotations.Nullable; + +/** + * Manager, responsible for "disaster recovery" operations. + * Internally it triggers meta-storage updates, in order to acquire unique causality token. + * As a reaction to these updates, manager performs actual recovery operations, such as {@link #resetPartitions(int, int)}. + * More details are in the <a href="https://issues.apache.org/jira/browse/IGNITE-21140">epic</a>. + */ +public class DisasterRecoveryManager implements IgniteComponent { + /** Logger. */ + private static final IgniteLogger LOG = Loggers.forClass(DisasterRecoveryManager.class); + + /** Single key for writing disaster recovery requests into meta-storage. */ + static final ByteArray RECOVERY_TRIGGER_KEY = new ByteArray("disaster.recovery.trigger"); + + private static final TableMessagesFactory MSG_FACTORY = new TableMessagesFactory(); + + private static final int TIMEOUT = 30; + + private static final int CATCH_UP_THRESHOLD = 10; + + /** Thread pool executor for async parts. */ + private final ExecutorService threadPool; + + /** Messaging service. */ + private final MessagingService messagingService; + + /** Meta-storage manager. */ + final MetaStorageManager metaStorageManager; + + /** Catalog manager. */ + final CatalogManager catalogManager; + + /** Distribution zone manager. */ + final DistributionZoneManager dzManager; + + /** Raft manager. */ + private final Loza raftManager; + + /** Watch listener for {@link #RECOVERY_TRIGGER_KEY}. */ + private final WatchListener watchListener; + + /** + * Map of operations, triggered by local node, that have not yet been processed by {@link #watchListener}. Values in the map are the + * futures, returned from the {@link #processNewRequest(ManualGroupUpdateRequest)}, they are completed by + * {@link #handleTriggerKeyUpdate(WatchEvent)} when node receives corresponding events from the metastorage (or if it doesn't receive + * this event within a 30 seconds window). + */ + private final Map<UUID, CompletableFuture<Void>> ongoingOperationsById = new ConcurrentHashMap<>(); + + /** + * Constructor. + */ + public DisasterRecoveryManager( + ExecutorService threadPool, + MessagingService messagingService, + MetaStorageManager metaStorageManager, + CatalogManager catalogManager, + DistributionZoneManager dzManager, + Loza raftManager + ) { + this.threadPool = threadPool; + this.messagingService = messagingService; + this.metaStorageManager = metaStorageManager; + this.catalogManager = catalogManager; + this.dzManager = dzManager; + this.raftManager = raftManager; + + watchListener = new WatchListener() { + @Override + public CompletableFuture<Void> onUpdate(WatchEvent event) { + return handleTriggerKeyUpdate(event); + } + + @Override + public void onError(Throwable e) { + // No-op. + } + }; + + messagingService.addMessageHandler(TableMessageGroup.class, this::handleMessage); + } + + @Override + public CompletableFuture<Void> start() { + metaStorageManager.registerExactWatch(RECOVERY_TRIGGER_KEY, watchListener); + + return nullCompletedFuture(); + } + + @Override + public void stop() throws Exception { + metaStorageManager.unregisterWatch(watchListener); + } + + /** + * Updates assignments of the table in a forced manner, allowing for the recovery of raft group with lost majorities. It is achieved via + * triggering a new rebalance with {@code force} flag enabled in {@link Assignments} for partitions where it's required. New pending + * assignments with {@code force} flag remove old stable nodes from the distribution, and force new Raft configuration via "resetPeers" + * so that a new leader could be elected. + * + * @param zoneId Distribution zone ID. + * @param tableId Table ID. + * @return Operation future. + */ + public CompletableFuture<Void> resetPartitions(int zoneId, int tableId) { + return processNewRequest(new ManualGroupUpdateRequest(UUID.randomUUID(), zoneId, tableId)); + } + + /** + * Returns partition states for all zones' partitions in the cluster. Result is a mapping of {@link TablePartitionId} to the mapping + * between a node name and a partition state. + * + * @param zoneName Zone name. + * @return Future with the mapping. + */ + public CompletableFuture<Map<TablePartitionId, Map<String, LocalPartitionState>>> partitionStates(String zoneName) { + int latestCatalogVersion = catalogManager.latestCatalogVersion(); Review Comment: Probably you need to add TODO so as not to forget when implementing catalog compaction. ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java: ########## @@ -494,6 +495,16 @@ public void resetPeers(RaftNodeId raftNodeId, PeersAndLearners peersAndLearners) raftServer.resetPeers(raftNodeId, peersAndLearners); } + /** + * Iterates over all currently started raft services. Doesn't block the starting or stopping of other services, so consumer may + * accidentally receive stopped service. + * + * @param consumer Closure to process each service. + */ + public void forEach(BiConsumer<RaftNodeId, org.apache.ignite.raft.jraft.RaftGroupService> consumer) { Review Comment: What I see, is not a separate interface, MEOW. ########## modules/table/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java: ########## @@ -67,7 +71,7 @@ import org.junit.jupiter.api.TestInfo; /** - * Tests for scenarios where pajority of peers is not available. + * Tests for scenarios where majority of peers is not available. Review Comment: LOL ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/DisasterRecoveryManager.java: ########## @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.disaster; + +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.ignite.internal.affinity.Assignments; +import org.apache.ignite.internal.catalog.CatalogManager; +import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; +import org.apache.ignite.internal.distributionzones.DistributionZoneManager; +import org.apache.ignite.internal.distributionzones.NodeWithAttributes; +import org.apache.ignite.internal.distributionzones.exception.DistributionZoneNotFoundException; +import org.apache.ignite.internal.lang.ByteArray; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.manager.IgniteComponent; +import org.apache.ignite.internal.metastorage.Entry; +import org.apache.ignite.internal.metastorage.MetaStorageManager; +import org.apache.ignite.internal.metastorage.WatchEvent; +import org.apache.ignite.internal.metastorage.WatchListener; +import org.apache.ignite.internal.network.MessagingService; +import org.apache.ignite.internal.network.NetworkMessage; +import org.apache.ignite.internal.raft.Loza; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.table.distributed.TableMessageGroup; +import org.apache.ignite.internal.table.distributed.TableMessagesFactory; +import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionState; +import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStatesRequest; +import org.apache.ignite.internal.table.distributed.disaster.messages.LocalPartitionStatesResponse; +import org.apache.ignite.internal.util.ByteUtils; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.raft.jraft.Node; +import org.apache.ignite.raft.jraft.core.State; +import org.jetbrains.annotations.Nullable; + +/** + * Manager, responsible for "disaster recovery" operations. + * Internally it triggers meta-storage updates, in order to acquire unique causality token. + * As a reaction to these updates, manager performs actual recovery operations, such as {@link #resetPartitions(int, int)}. + * More details are in the <a href="https://issues.apache.org/jira/browse/IGNITE-21140">epic</a>. + */ +public class DisasterRecoveryManager implements IgniteComponent { + /** Logger. */ + private static final IgniteLogger LOG = Loggers.forClass(DisasterRecoveryManager.class); + + /** Single key for writing disaster recovery requests into meta-storage. */ + static final ByteArray RECOVERY_TRIGGER_KEY = new ByteArray("disaster.recovery.trigger"); + + private static final TableMessagesFactory MSG_FACTORY = new TableMessagesFactory(); + + private static final int TIMEOUT = 30; + + private static final int CATCH_UP_THRESHOLD = 10; + + /** Thread pool executor for async parts. */ + private final ExecutorService threadPool; + + /** Messaging service. */ + private final MessagingService messagingService; + + /** Meta-storage manager. */ + final MetaStorageManager metaStorageManager; + + /** Catalog manager. */ + final CatalogManager catalogManager; + + /** Distribution zone manager. */ + final DistributionZoneManager dzManager; + + /** Raft manager. */ + private final Loza raftManager; + + /** Watch listener for {@link #RECOVERY_TRIGGER_KEY}. */ + private final WatchListener watchListener; + + /** + * Map of operations, triggered by local node, that have not yet been processed by {@link #watchListener}. Values in the map are the + * futures, returned from the {@link #processNewRequest(ManualGroupUpdateRequest)}, they are completed by + * {@link #handleTriggerKeyUpdate(WatchEvent)} when node receives corresponding events from the metastorage (or if it doesn't receive + * this event within a 30 seconds window). + */ + private final Map<UUID, CompletableFuture<Void>> ongoingOperationsById = new ConcurrentHashMap<>(); + + /** + * Constructor. + */ + public DisasterRecoveryManager( + ExecutorService threadPool, + MessagingService messagingService, + MetaStorageManager metaStorageManager, + CatalogManager catalogManager, + DistributionZoneManager dzManager, + Loza raftManager + ) { + this.threadPool = threadPool; + this.messagingService = messagingService; + this.metaStorageManager = metaStorageManager; + this.catalogManager = catalogManager; + this.dzManager = dzManager; + this.raftManager = raftManager; + + watchListener = new WatchListener() { + @Override + public CompletableFuture<Void> onUpdate(WatchEvent event) { + return handleTriggerKeyUpdate(event); + } + + @Override + public void onError(Throwable e) { + // No-op. + } + }; + + messagingService.addMessageHandler(TableMessageGroup.class, this::handleMessage); + } + + @Override + public CompletableFuture<Void> start() { + metaStorageManager.registerExactWatch(RECOVERY_TRIGGER_KEY, watchListener); + + return nullCompletedFuture(); + } + + @Override + public void stop() throws Exception { + metaStorageManager.unregisterWatch(watchListener); + } + + /** + * Updates assignments of the table in a forced manner, allowing for the recovery of raft group with lost majorities. It is achieved via + * triggering a new rebalance with {@code force} flag enabled in {@link Assignments} for partitions where it's required. New pending + * assignments with {@code force} flag remove old stable nodes from the distribution, and force new Raft configuration via "resetPeers" + * so that a new leader could be elected. + * + * @param zoneId Distribution zone ID. + * @param tableId Table ID. + * @return Operation future. + */ + public CompletableFuture<Void> resetPartitions(int zoneId, int tableId) { + return processNewRequest(new ManualGroupUpdateRequest(UUID.randomUUID(), zoneId, tableId)); + } + + /** + * Returns partition states for all zones' partitions in the cluster. Result is a mapping of {@link TablePartitionId} to the mapping + * between a node name and a partition state. + * + * @param zoneName Zone name. + * @return Future with the mapping. + */ + public CompletableFuture<Map<TablePartitionId, Map<String, LocalPartitionState>>> partitionStates(String zoneName) { + int latestCatalogVersion = catalogManager.latestCatalogVersion(); + Optional<CatalogZoneDescriptor> zoneDesciptorOptional = catalogManager.zones(latestCatalogVersion).stream() + .filter(catalogZoneDescriptor -> catalogZoneDescriptor.name().equals(zoneName)) + .findAny(); + + if (zoneDesciptorOptional.isEmpty()) { + return CompletableFuture.failedFuture(new DistributionZoneNotFoundException(zoneName, null)); + } + + CatalogZoneDescriptor zoneDescriptor = zoneDesciptorOptional.get(); + + Set<NodeWithAttributes> logicalTopology = dzManager.logicalTopology(); + + LocalPartitionStatesRequest localPartitionStatesRequest = MSG_FACTORY.localPartitionStatesRequest() + .zoneId(zoneDescriptor.id()) + .catalogVersion(latestCatalogVersion) + .build(); + + Map<TablePartitionId, Map<String, LocalPartitionState>> result = new ConcurrentHashMap<>(); + List<CompletableFuture<?>> futures = new ArrayList<>(); + + for (NodeWithAttributes node : logicalTopology) { + CompletableFuture<NetworkMessage> invokeFuture = messagingService.invoke( + node.nodeName(), + localPartitionStatesRequest, + TimeUnit.SECONDS.toMillis(TIMEOUT) + ); + + futures.add(invokeFuture.thenAccept(networkMessage -> { + assert networkMessage instanceof LocalPartitionStatesResponse : networkMessage; + + var response = (LocalPartitionStatesResponse) networkMessage; + + for (LocalPartitionState state : response.states()) { + result.compute(state.partitionId().asTablePartitionId(), (tablePartitionId, map) -> { + if (map == null) { + return Map.of(node.nodeName(), state); + } + + map = new HashMap<>(map); + map.put(node.nodeName(), state); + return map; + }); + } + })); + } + + return CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).handle((unused, throwable) -> normalize(result)); + } + + /** + * Creates new operation future, associated with the request, and writes it into meta-storage. + * + * @param request Request. + * @return Operation future. + */ + private CompletableFuture<Void> processNewRequest(ManualGroupUpdateRequest request) { + UUID operationId = request.operationId(); + + CompletableFuture<Void> operationFuture = new CompletableFuture<Void>() + .whenComplete((v, throwable) -> ongoingOperationsById.remove(operationId)) + .orTimeout(TIMEOUT, TimeUnit.SECONDS); + + ongoingOperationsById.put(operationId, operationFuture); Review Comment: Please leave a comment where these futures complete. ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/disaster/messages/LocalPartitionState.java: ########## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.disaster.messages; + +import org.apache.ignite.internal.network.NetworkMessage; +import org.apache.ignite.internal.network.annotations.Marshallable; +import org.apache.ignite.internal.network.annotations.Transferable; +import org.apache.ignite.internal.table.distributed.TableMessageGroup.DisasterRecoveryMessages; +import org.apache.ignite.internal.table.distributed.command.TablePartitionIdMessage; +import org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateEnum; + +/** + * Local partition state message, has partition ID, state and last committed log index. + */ +@Transferable(DisasterRecoveryMessages.LOCAL_PARTITION_STATE) +public interface LocalPartitionState extends NetworkMessage { Review Comment: Please add comments to fields. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
