This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push: new 2f405b3 [NO ISSUE][CLUS] Add node active partitions config 2f405b3 is described below commit 2f405b312c75f947cb947c3f915f10c2b7f812c0 Author: Murtadha Hubail <murtadha.hub...@couchbase.com> AuthorDate: Thu Aug 5 03:13:36 2021 +0300 [NO ISSUE][CLUS] Add node active partitions config - user model changes: no - storage format changes: no - interface changes: yes Details: - Add a new config (ACTIVE_PARTITIONS) that contains the current list of active partitions on a node. - By default, a node's active partitions list is the same as the node's assigned partitions. - Pass node active partitions to CC during bootstrap tasks. - Adapt test cases. Change-Id: Ia91e15897221f512aeeccbbe134f1d91db8aa629 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12663 Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Murtadha Hubail <mhub...@apache.org> Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com> --- .../apache/asterix/app/nc/NCAppRuntimeContext.java | 9 ++---- .../app/replication/NcLifecycleCoordinator.java | 23 +++++++------- .../message/NCLifecycleTaskReportMessage.java | 11 ++++++- .../message/RegistrationTasksRequestMessage.java | 15 ++++++--- .../message/RegistrationTasksResponseMessage.java | 5 ++- .../asterix/hyracks/bootstrap/NCApplication.java | 3 +- .../asterix/runtime/ClusterStateManagerTest.java | 21 ++++++++++++- .../common/cluster/IClusterStateManager.java | 4 ++- .../asterix/common/config/MetadataProperties.java | 5 +++ .../asterix/common/config/NodeProperties.java | 5 +-- .../asterix/common/config/PropertiesAccessor.java | 13 ++++++++ .../asterix/runtime/utils/ClusterStateManager.java | 36 +++++++++++++--------- 12 files changed, 106 insertions(+), 44 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index 9aa433f..eb8a92f 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -21,12 +21,10 @@ package org.apache.asterix.app.nc; import java.io.IOException; import java.rmi.RemoteException; import java.rmi.server.UnicastRemoteObject; -import java.util.Arrays; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; -import java.util.stream.Collectors; import org.apache.asterix.active.ActiveManager; import org.apache.asterix.common.api.IConfigValidator; @@ -38,7 +36,6 @@ import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.api.IPropertiesFactory; import org.apache.asterix.common.api.IReceptionist; import org.apache.asterix.common.api.IReceptionistFactory; -import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.config.ActiveProperties; import org.apache.asterix.common.config.BuildProperties; import org.apache.asterix.common.config.CompilerProperties; @@ -225,10 +222,8 @@ public class NCAppRuntimeContext implements INcApplicationContext { new DatasetLifecycleManager(storageProperties, localResourceRepository, txnSubsystem.getLogManager(), virtualBufferCache, indexCheckpointManagerProvider, ioManager.getIODevices().size()); final String nodeId = getServiceContext().getNodeId(); - final ClusterPartition[] nodePartitions = metadataProperties.getNodePartitions().get(nodeId); - final Set<Integer> nodePartitionsIds = - Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId).collect(Collectors.toSet()); - replicaManager = new ReplicaManager(this, nodePartitionsIds); + final Set<Integer> nodePartitions = metadataProperties.getNodeActivePartitions(nodeId); + replicaManager = new ReplicaManager(this, nodePartitions); isShuttingdown = false; activeManager = new ActiveManager(threadExecutor, getServiceContext().getNodeId(), activeProperties.getMemoryComponentGlobalBudget(), compilerProperties.getFrameSize(), diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java index c4e4f82..f164773 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java @@ -26,14 +26,12 @@ import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; import org.apache.asterix.app.nc.task.BindMetadataNodeTask; import org.apache.asterix.app.nc.task.CheckpointTask; @@ -51,7 +49,6 @@ import org.apache.asterix.app.replication.message.RegistrationTasksRequestMessag import org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage; import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; import org.apache.asterix.common.api.INCLifecycleTask; -import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; @@ -102,7 +99,7 @@ public class NcLifecycleCoordinator implements INcLifecycleCoordinator { @Override public void notifyNodeFailure(String nodeId, InetSocketAddress replicaAddress) throws HyracksDataException { pendingStartupCompletionNodes.remove(nodeId); - clusterManager.updateNodeState(nodeId, false, null); + clusterManager.updateNodeState(nodeId, false, null, null); if (nodeId.equals(metadataNodeId)) { clusterManager.updateMetadataNode(metadataNodeId, false); } @@ -138,7 +135,8 @@ public class NcLifecycleCoordinator implements INcLifecycleCoordinator { private void process(RegistrationTasksRequestMessage msg) throws HyracksDataException { final String nodeId = msg.getNodeId(); nodeSecretsMap.put(nodeId, msg.getSecrets()); - List<INCLifecycleTask> tasks = buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState()); + List<INCLifecycleTask> tasks = + buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), msg.getState(), msg.getActivePartitions()); RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks); try { messageBroker.sendApplicationMessageToNC(response, msg.getNodeId()); @@ -157,7 +155,7 @@ public class NcLifecycleCoordinator implements INcLifecycleCoordinator { return; } if (msg.isSuccess()) { - clusterManager.updateNodeState(msg.getNodeId(), true, msg.getLocalCounters()); + clusterManager.updateNodeState(msg.getNodeId(), true, msg.getLocalCounters(), msg.getActivePartitions()); if (msg.getNodeId().equals(metadataNodeId)) { clusterManager.updateMetadataNode(metadataNodeId, true); } @@ -167,7 +165,8 @@ public class NcLifecycleCoordinator implements INcLifecycleCoordinator { } } - protected List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state) { + protected List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state, + Set<Integer> activePartitions) { LOGGER.info("Building registration tasks for node {} with status {} and system state: {}", nodeId, nodeStatus, state); final boolean isMetadataNode = nodeId.equals(metadataNodeId); @@ -175,7 +174,7 @@ public class NcLifecycleCoordinator implements INcLifecycleCoordinator { case ACTIVE: return buildActiveNCRegTasks(isMetadataNode); case IDLE: - return buildIdleNcRegTasks(nodeId, isMetadataNode, state); + return buildIdleNcRegTasks(nodeId, isMetadataNode, state, activePartitions); default: return new ArrayList<>(); } @@ -210,13 +209,13 @@ public class NcLifecycleCoordinator implements INcLifecycleCoordinator { } } - protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, boolean metadataNode, SystemState state) { + protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, boolean metadataNode, SystemState state, + Set<Integer> activePartitions) { final List<INCLifecycleTask> tasks = new ArrayList<>(); tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING)); if (state == SystemState.CORRUPTED) { - // need to perform local recovery for node partitions - LocalRecoveryTask rt = new LocalRecoveryTask(Arrays.stream(clusterManager.getNodePartitions(newNodeId)) - .map(ClusterPartition::getPartitionId).collect(Collectors.toSet())); + // need to perform local recovery for node active partitions + LocalRecoveryTask rt = new LocalRecoveryTask(activePartitions); tasks.add(rt); } if (replicationEnabled) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java index 79fa7c8..1309369 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.app.replication.message; +import java.util.Set; + import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.messaging.api.ICcAddressedMessage; import org.apache.asterix.common.replication.INCLifecycleMessage; @@ -31,11 +33,14 @@ public class NCLifecycleTaskReportMessage implements INCLifecycleMessage, ICcAdd private final boolean success; private Throwable exception; private final NcLocalCounters localCounters; + private final Set<Integer> activePartitions; - public NCLifecycleTaskReportMessage(String nodeId, boolean success, NcLocalCounters localCounters) { + public NCLifecycleTaskReportMessage(String nodeId, boolean success, NcLocalCounters localCounters, + Set<Integer> activePartitions) { this.nodeId = nodeId; this.success = success; this.localCounters = localCounters; + this.activePartitions = activePartitions; } @Override @@ -67,4 +72,8 @@ public class NCLifecycleTaskReportMessage implements INCLifecycleMessage, ICcAdd public MessageType getType() { return MessageType.REGISTRATION_TASKS_RESULT; } + + public Set<Integer> getActivePartitions() { + return activePartitions; + } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java index c2cc63c..fb50b3e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java @@ -20,6 +20,7 @@ package org.apache.asterix.app.replication.message; import java.util.HashMap; import java.util.Map; +import java.util.Set; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.messaging.api.ICcAddressedMessage; @@ -42,20 +43,22 @@ public class RegistrationTasksRequestMessage implements INCLifecycleMessage, ICc protected final String nodeId; protected final NodeStatus nodeStatus; protected final Map<String, Object> secrets; + protected final Set<Integer> activePartitions; public RegistrationTasksRequestMessage(String nodeId, NodeStatus nodeStatus, SystemState state, - Map<String, Object> secretsEphemeral) { + Map<String, Object> secretsEphemeral, Set<Integer> activePartitions) { this.state = state; this.nodeId = nodeId; this.nodeStatus = nodeStatus; this.secrets = new HashMap<>(secretsEphemeral); + this.activePartitions = activePartitions; } public static void send(CcId ccId, NodeControllerService cs, NodeStatus nodeStatus, SystemState systemState, - Map<String, Object> secretsEphemeral) throws HyracksDataException { + Map<String, Object> secretsEphemeral, Set<Integer> activePartitions) throws HyracksDataException { try { - RegistrationTasksRequestMessage msg = - new RegistrationTasksRequestMessage(cs.getId(), nodeStatus, systemState, secretsEphemeral); + RegistrationTasksRequestMessage msg = new RegistrationTasksRequestMessage(cs.getId(), nodeStatus, + systemState, secretsEphemeral, activePartitions); ((INCMessageBroker) cs.getContext().getMessageBroker()).sendMessageToCC(ccId, msg); } catch (Exception e) { LOGGER.log(Level.ERROR, "Unable to send RegistrationTasksRequestMessage to CC", e); @@ -88,4 +91,8 @@ public class RegistrationTasksRequestMessage implements INCLifecycleMessage, ICc public Map<String, Object> getSecrets() { return secrets; } + + public Set<Integer> getActivePartitions() { + return activePartitions; + } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java index 1227a6f..d8e1894 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java @@ -19,6 +19,7 @@ package org.apache.asterix.app.replication.message; import java.util.List; +import java.util.Set; import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.asterix.common.api.INcApplicationContext; @@ -72,7 +73,9 @@ public class RegistrationTasksResponseMessage extends CcIdentifiedMessage } NcLocalCounters localCounter = success ? NcLocalCounters.collect(getCcId(), (NodeControllerService) appCtx.getServiceContext().getControllerService()) : null; - NCLifecycleTaskReportMessage result = new NCLifecycleTaskReportMessage(nodeId, success, localCounter); + Set<Integer> nodeActivePartitions = appCtx.getMetadataProperties().getNodeActivePartitions(nodeId); + NCLifecycleTaskReportMessage result = + new NCLifecycleTaskReportMessage(nodeId, success, localCounter, nodeActivePartitions); result.setException(exception); try { broker.sendMessageToCC(getCcId(), result); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java index c148c92..5b10512 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java @@ -300,7 +300,8 @@ public class NCApplication extends BaseNCApplication { apiServer != null ? Collections.singletonMap(SYS_AUTH_HEADER, apiServer.ctx().get(SYS_AUTH_HEADER)) : Collections.emptyMap(); RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) ncServiceCtx.getControllerService(), - currentStatus, systemState, httpSecrets); + currentStatus, systemState, httpSecrets, + runtimeContext.getMetadataProperties().getNodeActivePartitions(nodeId)); } @Override diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java index 9cc295e..b80fa30 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java @@ -19,7 +19,9 @@ package org.apache.asterix.runtime; import java.util.Collections; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; @@ -200,7 +202,8 @@ public class ClusterStateManagerTest { private void notifyNodeStartupCompletion(CcApplicationContext applicationContext, String nodeId) throws HyracksDataException { - NCLifecycleTaskReportMessage msg = new NCLifecycleTaskReportMessage(nodeId, true, mockLocalCounters()); + NCLifecycleTaskReportMessage msg = + new NCLifecycleTaskReportMessage(nodeId, true, mockLocalCounters(), getNodeActivePartitions(nodeId)); applicationContext.getNcLifecycleCoordinator().process(msg); } @@ -262,4 +265,20 @@ public class ClusterStateManagerTest { Mockito.when(localCounters.getMaxTxnId()).thenReturn(1000L); return localCounters; } + + private static Set<Integer> getNodeActivePartitions(String nodeId) { + Set<Integer> activePartitions = new HashSet<>(); + switch (nodeId) { + case NC1: + activePartitions.add(0); + break; + case NC2: + activePartitions.add(1); + break; + case NC3: + activePartitions.add(2); + break; + } + return activePartitions; + } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java index a37e6e4..2ee9435 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java @@ -62,9 +62,11 @@ public interface IClusterStateManager { * @param nodeId * @param active * @param ncLocalCounters + * @param activePartitions * @throws HyracksDataException */ - void updateNodeState(String nodeId, boolean active, NcLocalCounters ncLocalCounters) throws HyracksDataException; + void updateNodeState(String nodeId, boolean active, NcLocalCounters ncLocalCounters, Set<Integer> activePartitions) + throws HyracksDataException; /** * Updates the active node and active state of the cluster partition with id {@code partitionNum} diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataProperties.java index 7d5ec42..31708d3 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataProperties.java @@ -24,6 +24,7 @@ import static org.apache.hyracks.control.common.config.OptionTypes.STRING; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.SortedMap; import org.apache.asterix.common.cluster.ClusterPartition; @@ -123,6 +124,10 @@ public class MetadataProperties extends AbstractProperties { return accessor.getClusterPartitions(); } + public Set<Integer> getNodeActivePartitions(String nodeId) { + return accessor.getActivePartitions(nodeId); + } + public Map<String, String> getTransactionLogDirs() { return accessor.getTransactionLogDirs(); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java index aaf6316..a03530d 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java @@ -46,7 +46,8 @@ public class NodeProperties extends AbstractProperties { STARTING_PARTITION_ID( OptionTypes.INTEGER, -1, - "The first partition id to assign to iodevices on this node (-1 == auto-assign)"); + "The first partition id to assign to iodevices on this node (-1 == auto-assign)"), + ACTIVE_PARTITIONS(OptionTypes.STRING_ARRAY, null, "List of node active partitions"); private final IOptionType type; private final Object defaultValue; @@ -95,7 +96,7 @@ public class NodeProperties extends AbstractProperties { @Override public boolean hidden() { - return this == INITIAL_RUN || this == STARTING_PARTITION_ID; + return this == INITIAL_RUN || this == STARTING_PARTITION_ID || this == ACTIVE_PARTITIONS; } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java index 3b6100c..5ba378d 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java @@ -25,6 +25,7 @@ import static org.apache.hyracks.util.file.FileUtil.joinPath; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -35,6 +36,7 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; +import java.util.stream.Collectors; import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.exceptions.AsterixException; @@ -192,6 +194,17 @@ public class PropertiesAccessor implements IApplicationConfig { return clusterPartitions; } + public Set<Integer> getActivePartitions(String nodeId) { + // by default, node actives partitions are the partitions assigned to the node + String[] activePartitions = cfg.getStringArray(NodeProperties.Option.ACTIVE_PARTITIONS); + if (activePartitions == null) { + ClusterPartition[] nodeClusterPartitions = nodePartitionsMap.get(nodeId); + return Arrays.stream(nodeClusterPartitions).map(ClusterPartition::getPartitionId) + .collect(Collectors.toSet()); + } + return Arrays.stream(activePartitions).map(Integer::parseInt).collect(Collectors.toSet()); + } + public List<AsterixExtension> getExtensions() { return extensions; } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java index 98a97b0..7e50102 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java @@ -139,19 +139,15 @@ public class ClusterStateManager implements IClusterStateManager { } @Override - public synchronized void updateNodeState(String nodeId, boolean active, NcLocalCounters localCounters) { + public synchronized void updateNodeState(String nodeId, boolean active, NcLocalCounters localCounters, + Set<Integer> activePartitions) { if (active) { updateClusterCounters(nodeId, localCounters); participantNodes.add(nodeId); + activateNodePartitions(nodeId, activePartitions); } else { participantNodes.remove(nodeId); - } - ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId); - // if this isn't a storage node, it will not have cluster partitions - if (nodePartitions != null) { - for (ClusterPartition p : nodePartitions) { - updateClusterPartition(p.getPartitionId(), nodeId, active); - } + deactivateNodePartitions(nodeId); } } @@ -416,11 +412,10 @@ public class ClusterStateManager implements IClusterStateManager { public synchronized void deregisterNodePartitions(String nodeId) throws HyracksDataException { ClusterPartition[] nodePartitions = node2PartitionsMap.remove(nodeId); if (nodePartitions == null) { - LOGGER.info("deregisterNodePartitions unknown node " + nodeId + " (already removed?)"); + LOGGER.info("deregisterNodePartitions unknown node {} (already removed?)", nodeId); } else { - if (LOGGER.isInfoEnabled()) { - LOGGER.info("deregisterNodePartitions for node " + nodeId + ": " + Arrays.toString(nodePartitions)); - } + LOGGER.info("deregisterNodePartitions for node {}: {}", () -> nodeId, + () -> Arrays.toString(nodePartitions)); for (ClusterPartition nodePartition : nodePartitions) { clusterPartitions.remove(nodePartition.getPartitionId()); } @@ -431,12 +426,12 @@ public class ClusterStateManager implements IClusterStateManager { @Override public synchronized void removePending(String nodeId) { if (LOGGER.isInfoEnabled()) { - LOGGER.info("Registering intention to remove node id " + nodeId); + LOGGER.info("Registering intention to remove node id {}", nodeId); } if (participantNodes.contains(nodeId)) { pendingRemoval.add(nodeId); } else { - LOGGER.warn("Cannot register unknown node " + nodeId + " for pending removal"); + LOGGER.warn("Cannot register unknown node {} for pending removal", nodeId); } } @@ -496,6 +491,19 @@ public class ClusterStateManager implements IClusterStateManager { }); } + private synchronized void activateNodePartitions(String nodeId, Set<Integer> activePartitions) { + for (Integer partitionId : activePartitions) { + updateClusterPartition(partitionId, nodeId, true); + } + } + + private synchronized void deactivateNodePartitions(String nodeId) { + clusterPartitions.values().stream() + .filter(partition -> partition.getActiveNodeId() != null && partition.getActiveNodeId().equals(nodeId)) + .forEach(nodeActivePartition -> updateClusterPartition(nodeActivePartition.getPartitionId(), nodeId, + false)); + } + private static InetSocketAddress getReplicaLocation(IClusterStateManager csm, String nodeId) { final Map<IOption, Object> ncConfig = csm.getActiveNcConfiguration().get(nodeId); if (ncConfig == null) {