This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f405b3  [NO ISSUE][CLUS] Add node active partitions config
2f405b3 is described below

commit 2f405b312c75f947cb947c3f915f10c2b7f812c0
Author: Murtadha Hubail <murtadha.hub...@couchbase.com>
AuthorDate: Thu Aug 5 03:13:36 2021 +0300

    [NO ISSUE][CLUS] Add node active partitions config
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    
    - Add a new config (ACTIVE_PARTITIONS) that contains the current
      list of active partitions on a node.
    - By default, a node's active partitions list is the same as the
      node's assigned partitions.
    - Pass node active partitions to CC during bootstrap tasks.
    - Adapt test cases.
    
    Change-Id: Ia91e15897221f512aeeccbbe134f1d91db8aa629
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12663
    Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Murtadha Hubail <mhub...@apache.org>
    Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com>
---
 .../apache/asterix/app/nc/NCAppRuntimeContext.java |  9 ++----
 .../app/replication/NcLifecycleCoordinator.java    | 23 +++++++-------
 .../message/NCLifecycleTaskReportMessage.java      | 11 ++++++-
 .../message/RegistrationTasksRequestMessage.java   | 15 ++++++---
 .../message/RegistrationTasksResponseMessage.java  |  5 ++-
 .../asterix/hyracks/bootstrap/NCApplication.java   |  3 +-
 .../asterix/runtime/ClusterStateManagerTest.java   | 21 ++++++++++++-
 .../common/cluster/IClusterStateManager.java       |  4 ++-
 .../asterix/common/config/MetadataProperties.java  |  5 +++
 .../asterix/common/config/NodeProperties.java      |  5 +--
 .../asterix/common/config/PropertiesAccessor.java  | 13 ++++++++
 .../asterix/runtime/utils/ClusterStateManager.java | 36 +++++++++++++---------
 12 files changed, 106 insertions(+), 44 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 9aa433f..eb8a92f 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -21,12 +21,10 @@ package org.apache.asterix.app.nc;
 import java.io.IOException;
 import java.rmi.RemoteException;
 import java.rmi.server.UnicastRemoteObject;
-import java.util.Arrays;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
-import java.util.stream.Collectors;
 
 import org.apache.asterix.active.ActiveManager;
 import org.apache.asterix.common.api.IConfigValidator;
@@ -38,7 +36,6 @@ import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.api.IPropertiesFactory;
 import org.apache.asterix.common.api.IReceptionist;
 import org.apache.asterix.common.api.IReceptionistFactory;
-import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.config.ActiveProperties;
 import org.apache.asterix.common.config.BuildProperties;
 import org.apache.asterix.common.config.CompilerProperties;
@@ -225,10 +222,8 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
                 new DatasetLifecycleManager(storageProperties, 
localResourceRepository, txnSubsystem.getLogManager(),
                         virtualBufferCache, indexCheckpointManagerProvider, 
ioManager.getIODevices().size());
         final String nodeId = getServiceContext().getNodeId();
-        final ClusterPartition[] nodePartitions = 
metadataProperties.getNodePartitions().get(nodeId);
-        final Set<Integer> nodePartitionsIds =
-                
Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId).collect(Collectors.toSet());
-        replicaManager = new ReplicaManager(this, nodePartitionsIds);
+        final Set<Integer> nodePartitions = 
metadataProperties.getNodeActivePartitions(nodeId);
+        replicaManager = new ReplicaManager(this, nodePartitions);
         isShuttingdown = false;
         activeManager = new ActiveManager(threadExecutor, 
getServiceContext().getNodeId(),
                 activeProperties.getMemoryComponentGlobalBudget(), 
compilerProperties.getFrameSize(),
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index c4e4f82..f164773 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -26,14 +26,12 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
 import org.apache.asterix.app.nc.task.CheckpointTask;
@@ -51,7 +49,6 @@ import 
org.apache.asterix.app.replication.message.RegistrationTasksRequestMessag
 import 
org.apache.asterix.app.replication.message.RegistrationTasksResponseMessage;
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.api.INCLifecycleTask;
-import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -102,7 +99,7 @@ public class NcLifecycleCoordinator implements 
INcLifecycleCoordinator {
     @Override
     public void notifyNodeFailure(String nodeId, InetSocketAddress 
replicaAddress) throws HyracksDataException {
         pendingStartupCompletionNodes.remove(nodeId);
-        clusterManager.updateNodeState(nodeId, false, null);
+        clusterManager.updateNodeState(nodeId, false, null, null);
         if (nodeId.equals(metadataNodeId)) {
             clusterManager.updateMetadataNode(metadataNodeId, false);
         }
@@ -138,7 +135,8 @@ public class NcLifecycleCoordinator implements 
INcLifecycleCoordinator {
     private void process(RegistrationTasksRequestMessage msg) throws 
HyracksDataException {
         final String nodeId = msg.getNodeId();
         nodeSecretsMap.put(nodeId, msg.getSecrets());
-        List<INCLifecycleTask> tasks = buildNCRegTasks(msg.getNodeId(), 
msg.getNodeStatus(), msg.getState());
+        List<INCLifecycleTask> tasks =
+                buildNCRegTasks(msg.getNodeId(), msg.getNodeStatus(), 
msg.getState(), msg.getActivePartitions());
         RegistrationTasksResponseMessage response = new 
RegistrationTasksResponseMessage(nodeId, tasks);
         try {
             messageBroker.sendApplicationMessageToNC(response, 
msg.getNodeId());
@@ -157,7 +155,7 @@ public class NcLifecycleCoordinator implements 
INcLifecycleCoordinator {
             return;
         }
         if (msg.isSuccess()) {
-            clusterManager.updateNodeState(msg.getNodeId(), true, 
msg.getLocalCounters());
+            clusterManager.updateNodeState(msg.getNodeId(), true, 
msg.getLocalCounters(), msg.getActivePartitions());
             if (msg.getNodeId().equals(metadataNodeId)) {
                 clusterManager.updateMetadataNode(metadataNodeId, true);
             }
@@ -167,7 +165,8 @@ public class NcLifecycleCoordinator implements 
INcLifecycleCoordinator {
         }
     }
 
-    protected List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus 
nodeStatus, SystemState state) {
+    protected List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus 
nodeStatus, SystemState state,
+            Set<Integer> activePartitions) {
         LOGGER.info("Building registration tasks for node {} with status {} 
and system state: {}", nodeId, nodeStatus,
                 state);
         final boolean isMetadataNode = nodeId.equals(metadataNodeId);
@@ -175,7 +174,7 @@ public class NcLifecycleCoordinator implements 
INcLifecycleCoordinator {
             case ACTIVE:
                 return buildActiveNCRegTasks(isMetadataNode);
             case IDLE:
-                return buildIdleNcRegTasks(nodeId, isMetadataNode, state);
+                return buildIdleNcRegTasks(nodeId, isMetadataNode, state, 
activePartitions);
             default:
                 return new ArrayList<>();
         }
@@ -210,13 +209,13 @@ public class NcLifecycleCoordinator implements 
INcLifecycleCoordinator {
         }
     }
 
-    protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, 
boolean metadataNode, SystemState state) {
+    protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, 
boolean metadataNode, SystemState state,
+            Set<Integer> activePartitions) {
         final List<INCLifecycleTask> tasks = new ArrayList<>();
         tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING));
         if (state == SystemState.CORRUPTED) {
-            // need to perform local recovery for node partitions
-            LocalRecoveryTask rt = new 
LocalRecoveryTask(Arrays.stream(clusterManager.getNodePartitions(newNodeId))
-                    
.map(ClusterPartition::getPartitionId).collect(Collectors.toSet()));
+            // need to perform local recovery for node active partitions
+            LocalRecoveryTask rt = new LocalRecoveryTask(activePartitions);
             tasks.add(rt);
         }
         if (replicationEnabled) {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
index 79fa7c8..1309369 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
@@ -18,6 +18,8 @@
  */
 package org.apache.asterix.app.replication.message;
 
+import java.util.Set;
+
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
 import org.apache.asterix.common.replication.INCLifecycleMessage;
@@ -31,11 +33,14 @@ public class NCLifecycleTaskReportMessage implements 
INCLifecycleMessage, ICcAdd
     private final boolean success;
     private Throwable exception;
     private final NcLocalCounters localCounters;
+    private final Set<Integer> activePartitions;
 
-    public NCLifecycleTaskReportMessage(String nodeId, boolean success, 
NcLocalCounters localCounters) {
+    public NCLifecycleTaskReportMessage(String nodeId, boolean success, 
NcLocalCounters localCounters,
+            Set<Integer> activePartitions) {
         this.nodeId = nodeId;
         this.success = success;
         this.localCounters = localCounters;
+        this.activePartitions = activePartitions;
     }
 
     @Override
@@ -67,4 +72,8 @@ public class NCLifecycleTaskReportMessage implements 
INCLifecycleMessage, ICcAdd
     public MessageType getType() {
         return MessageType.REGISTRATION_TASKS_RESULT;
     }
+
+    public Set<Integer> getActivePartitions() {
+        return activePartitions;
+    }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
index c2cc63c..fb50b3e 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
@@ -20,6 +20,7 @@ package org.apache.asterix.app.replication.message;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
@@ -42,20 +43,22 @@ public class RegistrationTasksRequestMessage implements 
INCLifecycleMessage, ICc
     protected final String nodeId;
     protected final NodeStatus nodeStatus;
     protected final Map<String, Object> secrets;
+    protected final Set<Integer> activePartitions;
 
     public RegistrationTasksRequestMessage(String nodeId, NodeStatus 
nodeStatus, SystemState state,
-            Map<String, Object> secretsEphemeral) {
+            Map<String, Object> secretsEphemeral, Set<Integer> 
activePartitions) {
         this.state = state;
         this.nodeId = nodeId;
         this.nodeStatus = nodeStatus;
         this.secrets = new HashMap<>(secretsEphemeral);
+        this.activePartitions = activePartitions;
     }
 
     public static void send(CcId ccId, NodeControllerService cs, NodeStatus 
nodeStatus, SystemState systemState,
-            Map<String, Object> secretsEphemeral) throws HyracksDataException {
+            Map<String, Object> secretsEphemeral, Set<Integer> 
activePartitions) throws HyracksDataException {
         try {
-            RegistrationTasksRequestMessage msg =
-                    new RegistrationTasksRequestMessage(cs.getId(), 
nodeStatus, systemState, secretsEphemeral);
+            RegistrationTasksRequestMessage msg = new 
RegistrationTasksRequestMessage(cs.getId(), nodeStatus,
+                    systemState, secretsEphemeral, activePartitions);
             ((INCMessageBroker) 
cs.getContext().getMessageBroker()).sendMessageToCC(ccId, msg);
         } catch (Exception e) {
             LOGGER.log(Level.ERROR, "Unable to send 
RegistrationTasksRequestMessage to CC", e);
@@ -88,4 +91,8 @@ public class RegistrationTasksRequestMessage implements 
INCLifecycleMessage, ICc
     public Map<String, Object> getSecrets() {
         return secrets;
     }
+
+    public Set<Integer> getActivePartitions() {
+        return activePartitions;
+    }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index 1227a6f..d8e1894 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.app.replication.message;
 
 import java.util.List;
+import java.util.Set;
 
 import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.api.INcApplicationContext;
@@ -72,7 +73,9 @@ public class RegistrationTasksResponseMessage extends 
CcIdentifiedMessage
             }
             NcLocalCounters localCounter = success ? 
NcLocalCounters.collect(getCcId(),
                     (NodeControllerService) 
appCtx.getServiceContext().getControllerService()) : null;
-            NCLifecycleTaskReportMessage result = new 
NCLifecycleTaskReportMessage(nodeId, success, localCounter);
+            Set<Integer> nodeActivePartitions = 
appCtx.getMetadataProperties().getNodeActivePartitions(nodeId);
+            NCLifecycleTaskReportMessage result =
+                    new NCLifecycleTaskReportMessage(nodeId, success, 
localCounter, nodeActivePartitions);
             result.setException(exception);
             try {
                 broker.sendMessageToCC(getCcId(), result);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index c148c92..5b10512 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -300,7 +300,8 @@ public class NCApplication extends BaseNCApplication {
                 apiServer != null ? Collections.singletonMap(SYS_AUTH_HEADER, 
apiServer.ctx().get(SYS_AUTH_HEADER))
                         : Collections.emptyMap();
         RegistrationTasksRequestMessage.send(ccId, (NodeControllerService) 
ncServiceCtx.getControllerService(),
-                currentStatus, systemState, httpSecrets);
+                currentStatus, systemState, httpSecrets,
+                
runtimeContext.getMetadataProperties().getNodeActivePartitions(nodeId));
     }
 
     @Override
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
index 9cc295e..b80fa30 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
@@ -19,7 +19,9 @@
 package org.apache.asterix.runtime;
 
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
@@ -200,7 +202,8 @@ public class ClusterStateManagerTest {
 
     private void notifyNodeStartupCompletion(CcApplicationContext 
applicationContext, String nodeId)
             throws HyracksDataException {
-        NCLifecycleTaskReportMessage msg = new 
NCLifecycleTaskReportMessage(nodeId, true, mockLocalCounters());
+        NCLifecycleTaskReportMessage msg =
+                new NCLifecycleTaskReportMessage(nodeId, true, 
mockLocalCounters(), getNodeActivePartitions(nodeId));
         applicationContext.getNcLifecycleCoordinator().process(msg);
     }
 
@@ -262,4 +265,20 @@ public class ClusterStateManagerTest {
         Mockito.when(localCounters.getMaxTxnId()).thenReturn(1000L);
         return localCounters;
     }
+
+    private static Set<Integer> getNodeActivePartitions(String nodeId) {
+        Set<Integer> activePartitions = new HashSet<>();
+        switch (nodeId) {
+            case NC1:
+                activePartitions.add(0);
+                break;
+            case NC2:
+                activePartitions.add(1);
+                break;
+            case NC3:
+                activePartitions.add(2);
+                break;
+        }
+        return activePartitions;
+    }
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index a37e6e4..2ee9435 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -62,9 +62,11 @@ public interface IClusterStateManager {
      * @param nodeId
      * @param active
      * @param ncLocalCounters
+     * @param activePartitions
      * @throws HyracksDataException
      */
-    void updateNodeState(String nodeId, boolean active, NcLocalCounters 
ncLocalCounters) throws HyracksDataException;
+    void updateNodeState(String nodeId, boolean active, NcLocalCounters 
ncLocalCounters, Set<Integer> activePartitions)
+            throws HyracksDataException;
 
     /**
      * Updates the active node and active state of the cluster partition with 
id {@code partitionNum}
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataProperties.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataProperties.java
index 7d5ec42..31708d3 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataProperties.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataProperties.java
@@ -24,6 +24,7 @@ import static 
org.apache.hyracks.control.common.config.OptionTypes.STRING;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.SortedMap;
 
 import org.apache.asterix.common.cluster.ClusterPartition;
@@ -123,6 +124,10 @@ public class MetadataProperties extends AbstractProperties 
{
         return accessor.getClusterPartitions();
     }
 
+    public Set<Integer> getNodeActivePartitions(String nodeId) {
+        return accessor.getActivePartitions(nodeId);
+    }
+
     public Map<String, String> getTransactionLogDirs() {
         return accessor.getTransactionLogDirs();
     }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
index aaf6316..a03530d 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/NodeProperties.java
@@ -46,7 +46,8 @@ public class NodeProperties extends AbstractProperties {
         STARTING_PARTITION_ID(
                 OptionTypes.INTEGER,
                 -1,
-                "The first partition id to assign to iodevices on this node 
(-1 == auto-assign)");
+                "The first partition id to assign to iodevices on this node 
(-1 == auto-assign)"),
+        ACTIVE_PARTITIONS(OptionTypes.STRING_ARRAY, null, "List of node active 
partitions");
 
         private final IOptionType type;
         private final Object defaultValue;
@@ -95,7 +96,7 @@ public class NodeProperties extends AbstractProperties {
 
         @Override
         public boolean hidden() {
-            return this == INITIAL_RUN || this == STARTING_PARTITION_ID;
+            return this == INITIAL_RUN || this == STARTING_PARTITION_ID || 
this == ACTIVE_PARTITIONS;
         }
 
     }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
index 3b6100c..5ba378d 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
@@ -25,6 +25,7 @@ import static org.apache.hyracks.util.file.FileUtil.joinPath;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -35,6 +36,7 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.exceptions.AsterixException;
@@ -192,6 +194,17 @@ public class PropertiesAccessor implements 
IApplicationConfig {
         return clusterPartitions;
     }
 
+    public Set<Integer> getActivePartitions(String nodeId) {
+        // by default, node actives partitions are the partitions assigned to 
the node
+        String[] activePartitions = 
cfg.getStringArray(NodeProperties.Option.ACTIVE_PARTITIONS);
+        if (activePartitions == null) {
+            ClusterPartition[] nodeClusterPartitions = 
nodePartitionsMap.get(nodeId);
+            return 
Arrays.stream(nodeClusterPartitions).map(ClusterPartition::getPartitionId)
+                    .collect(Collectors.toSet());
+        }
+        return 
Arrays.stream(activePartitions).map(Integer::parseInt).collect(Collectors.toSet());
+    }
+
     public List<AsterixExtension> getExtensions() {
         return extensions;
     }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 98a97b0..7e50102 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -139,19 +139,15 @@ public class ClusterStateManager implements 
IClusterStateManager {
     }
 
     @Override
-    public synchronized void updateNodeState(String nodeId, boolean active, 
NcLocalCounters localCounters) {
+    public synchronized void updateNodeState(String nodeId, boolean active, 
NcLocalCounters localCounters,
+            Set<Integer> activePartitions) {
         if (active) {
             updateClusterCounters(nodeId, localCounters);
             participantNodes.add(nodeId);
+            activateNodePartitions(nodeId, activePartitions);
         } else {
             participantNodes.remove(nodeId);
-        }
-        ClusterPartition[] nodePartitions = node2PartitionsMap.get(nodeId);
-        // if this isn't a storage node, it will not have cluster partitions
-        if (nodePartitions != null) {
-            for (ClusterPartition p : nodePartitions) {
-                updateClusterPartition(p.getPartitionId(), nodeId, active);
-            }
+            deactivateNodePartitions(nodeId);
         }
     }
 
@@ -416,11 +412,10 @@ public class ClusterStateManager implements 
IClusterStateManager {
     public synchronized void deregisterNodePartitions(String nodeId) throws 
HyracksDataException {
         ClusterPartition[] nodePartitions = node2PartitionsMap.remove(nodeId);
         if (nodePartitions == null) {
-            LOGGER.info("deregisterNodePartitions unknown node " + nodeId + " 
(already removed?)");
+            LOGGER.info("deregisterNodePartitions unknown node {} (already 
removed?)", nodeId);
         } else {
-            if (LOGGER.isInfoEnabled()) {
-                LOGGER.info("deregisterNodePartitions for node " + nodeId + ": 
" + Arrays.toString(nodePartitions));
-            }
+            LOGGER.info("deregisterNodePartitions for node {}: {}", () -> 
nodeId,
+                    () -> Arrays.toString(nodePartitions));
             for (ClusterPartition nodePartition : nodePartitions) {
                 clusterPartitions.remove(nodePartition.getPartitionId());
             }
@@ -431,12 +426,12 @@ public class ClusterStateManager implements 
IClusterStateManager {
     @Override
     public synchronized void removePending(String nodeId) {
         if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("Registering intention to remove node id " + nodeId);
+            LOGGER.info("Registering intention to remove node id {}", nodeId);
         }
         if (participantNodes.contains(nodeId)) {
             pendingRemoval.add(nodeId);
         } else {
-            LOGGER.warn("Cannot register unknown node " + nodeId + " for 
pending removal");
+            LOGGER.warn("Cannot register unknown node {} for pending removal", 
nodeId);
         }
     }
 
@@ -496,6 +491,19 @@ public class ClusterStateManager implements 
IClusterStateManager {
         });
     }
 
+    private synchronized void activateNodePartitions(String nodeId, 
Set<Integer> activePartitions) {
+        for (Integer partitionId : activePartitions) {
+            updateClusterPartition(partitionId, nodeId, true);
+        }
+    }
+
+    private synchronized void deactivateNodePartitions(String nodeId) {
+        clusterPartitions.values().stream()
+                .filter(partition -> partition.getActiveNodeId() != null && 
partition.getActiveNodeId().equals(nodeId))
+                .forEach(nodeActivePartition -> 
updateClusterPartition(nodeActivePartition.getPartitionId(), nodeId,
+                        false));
+    }
+
     private static InetSocketAddress getReplicaLocation(IClusterStateManager 
csm, String nodeId) {
         final Map<IOption, Object> ncConfig = 
csm.getActiveNcConfiguration().get(nodeId);
         if (ncConfig == null) {

Reply via email to