Michael Blow has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2004

Change subject: [NO ISSUE][HYR][*DB][CLUS] Startup lifecycle fixes
......................................................................

[NO ISSUE][HYR][*DB][CLUS] Startup lifecycle fixes

- Ensure thread factory is configured before using it
- Don't mark cluster state ACTIVE until after global recovery has
  completed
- Failure of global recovery causes CC to shutdown
- Don't mark cluster state ACTIVE until max resource id has been
  reported by all nodes

Change-Id: Id30415325047008c013e305ca11ccbb76bc7d8d8
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServiceContext.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ServiceContext.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
M 
hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCServiceContext.java
19 files changed, 136 insertions(+), 88 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/04/2004/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 37e0c58..7f56c9d 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -28,6 +28,7 @@
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadFactory;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -86,7 +87,7 @@
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.config.IConfigManager;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
-import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
+import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import org.apache.hyracks.control.cc.BaseCCApplication;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.controllers.CCConfig;
@@ -108,6 +109,11 @@
     private IHyracksClientConnection hcc;
 
     @Override
+    public ThreadFactory fixupThreadFactory(ThreadFactory orig, 
ILifeCycleComponentManager lifeCycleComponentManager) {
+        return new AsterixThreadFactory(orig, lifeCycleComponentManager);
+    }
+
+    @Override
     public void start(IServiceContext serviceCtx, String[] args) throws 
Exception {
         if (args.length > 0) {
             throw new IllegalArgumentException("Unrecognized argument(s): " + 
Arrays.toString(args));
@@ -122,8 +128,6 @@
             LOGGER.info("Starting Asterix cluster controller");
         }
 
-        ccServiceCtx.setThreadFactory(
-                new AsterixThreadFactory(ccServiceCtx.getThreadFactory(), new 
LifeCycleComponentManager()));
         String strIP = 
ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetAddress();
         int port = 
ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetPort();
         hcc = new HyracksConnection(strIP, port);
@@ -207,8 +211,8 @@
     }
 
     protected HttpServer setupJSONAPIServer(ExternalProperties 
externalProperties) throws Exception {
-        HttpServer jsonAPIServer =
-                new HttpServer(webManager.getBosses(), 
webManager.getWorkers(), externalProperties.getAPIServerPort());
+        HttpServer jsonAPIServer = new HttpServer(webManager.getBosses(), 
webManager.getWorkers(),
+                externalProperties.getAPIServerPort());
         jsonAPIServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
         jsonAPIServer.setAttribute(ASTERIX_APP_CONTEXT_INFO_ATTR, appCtx);
         jsonAPIServer.setAttribute(ServletConstants.EXECUTOR_SERVICE_ATTR,
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index 3209557..13f3afa 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -92,11 +92,8 @@
         MetadataTransactionContext mdTxnCtx = null;
         try {
             MetadataManager.INSTANCE.init();
-            // Loop over datasets
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
-            for (Dataverse dataverse : 
MetadataManager.INSTANCE.getDataverses(mdTxnCtx)) {
-                mdTxnCtx = recoverDataset(appCtx, mdTxnCtx, dataverse);
-            }
+            mdTxnCtx = doRecovery(appCtx, mdTxnCtx);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         } catch (Exception e) {
             // This needs to be fixed <-- Needs to shutdown the system -->
@@ -109,8 +106,8 @@
                 try {
                     MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
                 } catch (Exception e1) {
-                    LOGGER.log(Level.SEVERE, "Exception in aborting", e1);
-                    e1.addSuppressed(e);
+                    LOGGER.log(Level.SEVERE, "Exception aborting metadata 
transaction", e1);
+                    e.addSuppressed(e1);
                     throw new IllegalStateException(e);
                 }
             }
@@ -118,11 +115,21 @@
         }
         recoveryCompleted = true;
         LOGGER.info("Global Recovery Completed");
+        appCtx.getClusterStateManager().refreshState();
+    }
+
+    protected MetadataTransactionContext doRecovery(ICcApplicationContext 
appCtx, MetadataTransactionContext mdTxnCtx)
+            throws Exception {
+        // Loop over datasets
+        for (Dataverse dataverse : 
MetadataManager.INSTANCE.getDataverses(mdTxnCtx)) {
+            mdTxnCtx = recoverDataset(appCtx, mdTxnCtx, dataverse);
+        }
+        return mdTxnCtx;
     }
 
     @Override
     public void notifyStateChange(ClusterState newState) {
-        if (newState != ClusterState.ACTIVE) {
+        if (newState != ClusterState.ACTIVE && newState != 
ClusterState.RECOVERING) {
             recoveryCompleted = false;
         }
     }
@@ -132,8 +139,8 @@
         if 
(!dataverse.getDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME))
 {
             MetadataProvider metadataProvider = new MetadataProvider(appCtx, 
dataverse);
             try {
-                List<Dataset> datasets =
-                        
MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, 
dataverse.getDataverseName());
+                List<Dataset> datasets = 
MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx,
+                        dataverse.getDataverseName());
                 for (Dataset dataset : datasets) {
                     if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
                         // External dataset
@@ -145,8 +152,8 @@
                         TransactionState datasetState = dsd.getState();
                         if (!indexes.isEmpty()) {
                             if (datasetState == TransactionState.BEGIN) {
-                                List<ExternalFile> files =
-                                        
MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
+                                List<ExternalFile> files = 
MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx,
+                                        dataset);
                                 // if persumed abort, roll backward
                                 // 1. delete all pending files
                                 for (ExternalFile file : files) {
@@ -157,8 +164,8 @@
                             }
                             // 2. clean artifacts in NCs
                             metadataProvider.setMetadataTxnContext(mdTxnCtx);
-                            JobSpecification jobSpec =
-                                    
ExternalIndexingOperations.buildAbortOp(dataset, indexes, metadataProvider);
+                            JobSpecification jobSpec = 
ExternalIndexingOperations.buildAbortOp(dataset, indexes,
+                                    metadataProvider);
                             executeHyracksJob(jobSpec);
                             // 3. correct the dataset state
                             ((ExternalDatasetDetails) 
dataset.getDatasetDetails()).setState(TransactionState.COMMIT);
@@ -166,13 +173,13 @@
                             
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                             mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
                         } else if (datasetState == 
TransactionState.READY_TO_COMMIT) {
-                            List<ExternalFile> files =
-                                    
MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
+                            List<ExternalFile> files = 
MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx,
+                                    dataset);
                             // if ready to commit, roll forward
                             // 1. commit indexes in NCs
                             metadataProvider.setMetadataTxnContext(mdTxnCtx);
-                            JobSpecification jobSpec =
-                                    
ExternalIndexingOperations.buildRecoverOp(dataset, indexes, metadataProvider);
+                            JobSpecification jobSpec = 
ExternalIndexingOperations.buildRecoverOp(dataset, indexes,
+                                    metadataProvider);
                             executeHyracksJob(jobSpec);
                             // 2. add pending files in metadata
                             for (ExternalFile file : files) {
@@ -221,4 +228,5 @@
     public boolean isRecoveryCompleted() {
         return recoveryCompleted;
     }
+
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 0f6b396..05e7ccf 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -22,6 +22,7 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.ThreadFactory;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -55,6 +56,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IFileDeviceResolver;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
+import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import org.apache.hyracks.api.messages.IMessageBroker;
 import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.control.common.controllers.NCConfig;
@@ -80,13 +82,16 @@
     }
 
     @Override
+    public ThreadFactory fixupThreadFactory(ThreadFactory orig, 
ILifeCycleComponentManager lifeCycleComponentManager) {
+        return new AsterixThreadFactory(orig, lifeCycleComponentManager);
+    }
+
+    @Override
     public void start(IServiceContext serviceCtx, String[] args) throws 
Exception {
         if (args.length > 0) {
             throw new IllegalArgumentException("Unrecognized argument(s): " + 
Arrays.toString(args));
         }
         this.ncServiceCtx = (INCServiceContext) serviceCtx;
-        ncServiceCtx.setThreadFactory(
-                new AsterixThreadFactory(ncServiceCtx.getThreadFactory(), 
ncServiceCtx.getLifeCycleComponentManager()));
         nodeId = this.ncServiceCtx.getNodeId();
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Starting Asterix node controller: " + nodeId);
@@ -111,8 +116,8 @@
         MessagingProperties messagingProperties = 
runtimeContext.getMessagingProperties();
         IMessageBroker messageBroker = new NCMessageBroker(controllerService, 
messagingProperties);
         this.ncServiceCtx.setMessageBroker(messageBroker);
-        MessagingChannelInterfaceFactory interfaceFactory =
-                new MessagingChannelInterfaceFactory((NCMessageBroker) 
messageBroker, messagingProperties);
+        MessagingChannelInterfaceFactory interfaceFactory = new 
MessagingChannelInterfaceFactory(
+                (NCMessageBroker) messageBroker, messagingProperties);
         
this.ncServiceCtx.setMessagingChannelInterfaceFactory(interfaceFactory);
 
         IRecoveryManager recoveryMgr = 
runtimeContext.getTransactionSubsystem().getRecoveryManager();
@@ -125,8 +130,8 @@
                 LOGGER.info("Stores: " + 
PrintUtil.toString(metadataProperties.getStores()));
                 LOGGER.info("Root Metadata Store: " + 
metadataProperties.getStores().get(nodeId)[0]);
             }
-            PersistentLocalResourceRepository localResourceRepository =
-                    (PersistentLocalResourceRepository) 
runtimeContext.getLocalResourceRepository();
+            PersistentLocalResourceRepository localResourceRepository = 
(PersistentLocalResourceRepository) runtimeContext
+                    .getLocalResourceRepository();
             
localResourceRepository.initializeNewUniverse(ClusterProperties.INSTANCE.getStorageDirectoryName());
         }
 
@@ -224,8 +229,8 @@
         String[] ioDevices = ((PersistentLocalResourceRepository) 
runtimeContext.getLocalResourceRepository())
                 .getStorageMountingPoints();
         for (String ioDevice : ioDevices) {
-            String tempDatasetsDir =
-                    ioDevice + storageDirName + File.separator + 
StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER;
+            String tempDatasetsDir = ioDevice + storageDirName + File.separator
+                    + StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER;
             File tmpDsDir = new File(tempDatasetsDir);
             if (tmpDsDir.exists()) {
                 IoUtil.delete(tmpDsDir);
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
index bdbf4a5..1c2c6b3 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
@@ -20,23 +20,24 @@
 
 public interface IClusterManagementWork {
 
-    public enum WorkType {
+    enum WorkType {
         ADD_NODE,
         REMOVE_NODE
     }
 
-    public enum ClusterState {
+    enum ClusterState {
         STARTING,
         PENDING,
+        RECOVERING,
         ACTIVE,
         UNUSABLE,
         REBALANCING,
         SHUTTING_DOWN
     }
 
-    public WorkType getClusterManagementWorkType();
+    WorkType getClusterManagementWorkType();
 
-    public int getWorkId();
+    int getWorkId();
 
-    public IClusterEventsSubscriber getSourceSubscriber();
+    IClusterEventsSubscriber getSourceSubscriber();
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
index d36d383..ce49ccf 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
@@ -18,12 +18,14 @@
  */
 package org.apache.asterix.common.transactions;
 
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
 public interface IResourceIdManager {
 
     long createResourceId();
 
     boolean reported(String nodeId);
 
-    void report(String nodeId, long maxResourceId);
+    void report(String nodeId, long maxResourceId) throws HyracksDataException;
 
 }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java
index 8a3392a..a97e22a 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.metadata;
 
+import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -29,7 +30,9 @@
 public class GarbageCollector implements Runnable {
     private static final Logger LOGGER = 
Logger.getLogger(GarbageCollector.class.getName());
 
-    private static final long CLEANUP_PERIOD = 3600L * 24;
+    // TODO(mblow): make this configurable
+    private static final long CLEANUP_PERIOD = 1;
+    private static final TimeUnit CLEANUP_PERIOD_UNIT = TimeUnit.DAYS;
 
     static {
         // Starts the garbage collector thread which
@@ -40,13 +43,13 @@
     }
 
     @Override
-    @SuppressWarnings("squid:S2142") // rethrow or interrupt thread on 
InterruptedException
+    @SuppressWarnings({"squid:S2142", "squid:S2189"}) // rethrow/interrupt 
thread on InterruptedException, endless loop
     public void run() {
         LOGGER.info("Starting Metadata GC");
         while (true) {
             try {
                 synchronized (this) {
-                    this.wait(CLEANUP_PERIOD);
+                    CLEANUP_PERIOD_UNIT.timedWait(this, CLEANUP_PERIOD);
                 }
                 MetadataManager.INSTANCE.cleanupTempDatasets();
             } catch (InterruptedException e) {
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index 82a1177..decc1a9 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -71,6 +71,6 @@
 
     @Override
     public String toString() {
-        return ReportMaxResourceIdRequestMessage.class.getSimpleName();
+        return ResourceIdRequestMessage.class.getSimpleName();
     }
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
index 6a5ed08..afa626d 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
@@ -24,6 +24,7 @@
 
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.transactions.IResourceIdManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class ResourceIdManager implements IResourceIdManager {
 
@@ -59,13 +60,14 @@
     }
 
     @Override
-    public synchronized void report(String nodeId, long maxResourceId) {
+    public synchronized void report(String nodeId, long maxResourceId) throws 
HyracksDataException {
         if (!allReported) {
             globalResourceId.set(Math.max(maxResourceId, 
globalResourceId.get()));
             reportedNodes.add(nodeId);
             if (reportedNodes.size() == csm.getNumberOfNodes()) {
                 reportedNodes = null;
                 allReported = true;
+                csm.refreshState();
             }
         }
     }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 36cb10d..95cc460 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -40,6 +40,7 @@
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.replication.IFaultToleranceStrategy;
+import org.apache.asterix.common.transactions.IResourceIdManager;
 import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.asterix.event.schema.cluster.Node;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -118,6 +119,10 @@
 
     @Override
     public synchronized void setState(ClusterState state) {
+        if (this.state == state) {
+            LOGGER.info("ignoring update to same cluster state of " + 
this.state);
+            return;
+        }
         LOGGER.info("updating cluster state from " + this.state + " to " + 
state.name());
         this.state = state;
         appCtx.getGlobalRecoveryManager().notifyStateChange(state);
@@ -166,24 +171,33 @@
             setState(ClusterState.UNUSABLE);
             return;
         }
-
         for (ClusterPartition p : clusterPartitions.values()) {
             if (!p.isActive()) {
                 setState(ClusterState.UNUSABLE);
                 return;
             }
         }
-
+        IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
+        for (String node : activeNcConfiguration.keySet()) {
+            if (!resourceIdManager.reported(node)) {
+                setState(ClusterState.UNUSABLE);
+                return;
+            }
+        }
         // if all storage partitions are active as well as the metadata node, 
then the cluster is active
         if (metadataNodeActive) {
-            if (state != ClusterState.ACTIVE) {
+            if (state != ClusterState.ACTIVE && state != 
ClusterState.RECOVERING) {
                 setState(ClusterState.PENDING);
             }
             appCtx.getMetadataBootstrap().init();
-            setState(ClusterState.ACTIVE);
-            notifyAll();
-            // start global recovery
-            appCtx.getGlobalRecoveryManager().startGlobalRecovery(appCtx);
+
+            if (appCtx.getGlobalRecoveryManager().isRecoveryCompleted()) {
+                setState(ClusterState.ACTIVE);
+            } else {
+                // start global recovery
+                setState(ClusterState.RECOVERING);
+                appCtx.getGlobalRecoveryManager().startGlobalRecovery(appCtx);
+            }
         } else {
             setState(ClusterState.PENDING);
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java
index 3ce314f..d249bb5 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java
@@ -18,10 +18,15 @@
  */
 package org.apache.hyracks.api.application;
 
+import java.util.concurrent.ThreadFactory;
+
 import org.apache.hyracks.api.config.IConfigManager;
+import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import org.kohsuke.args4j.OptionHandlerFilter;
 
 public interface IApplication {
+    ThreadFactory fixupThreadFactory(ThreadFactory orig, 
ILifeCycleComponentManager lifeCycleComponentManager);
+
     void start(IServiceContext ctx, String[] args) throws Exception; //NOSONAR
 
     void startupCompleted() throws Exception; //NOSONAR
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServiceContext.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServiceContext.java
index bc3d7a1..5673f01 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServiceContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServiceContext.java
@@ -44,8 +44,6 @@
 
     ThreadFactory getThreadFactory();
 
-    void setThreadFactory(ThreadFactory threadFactory);
-
     IApplicationConfig getAppConfig();
 
     /**
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
index b94cf01..2513456 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.control.cc;
 
 import java.util.Arrays;
+import java.util.concurrent.ThreadFactory;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -28,6 +29,7 @@
 import org.apache.hyracks.api.config.Section;
 import org.apache.hyracks.api.job.resource.DefaultJobCapacityController;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import org.apache.hyracks.control.common.controllers.CCConfig;
 import org.apache.hyracks.control.common.controllers.ControllerConfig;
 import org.apache.hyracks.control.common.controllers.NCConfig;
@@ -40,6 +42,11 @@
     }
 
     @Override
+    public ThreadFactory fixupThreadFactory(ThreadFactory orig, 
ILifeCycleComponentManager lifeCycleComponentManager) {
+        return orig;
+    }
+
+    @Override
     public void start(IServiceContext serviceCtx, String[] args) throws 
Exception {
         if (args.length > 0) {
             throw new IllegalArgumentException("Unrecognized argument(s): " + 
Arrays.toString(args));
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index a243bf8..bb712db 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -36,6 +36,7 @@
 import java.util.TreeMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -52,6 +53,7 @@
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.job.JobIdFactory;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.api.topology.ClusterTopology;
 import org.apache.hyracks.api.topology.TopologyDefinitionParser;
@@ -79,6 +81,7 @@
 import org.apache.hyracks.control.common.ipc.CCNCFunctions;
 import org.apache.hyracks.control.common.logs.LogFile;
 import org.apache.hyracks.control.common.shutdown.ShutdownRun;
+import org.apache.hyracks.control.common.utils.HyracksThreadFactory;
 import org.apache.hyracks.control.common.work.WorkQueue;
 import org.apache.hyracks.ipc.api.IIPCI;
 import org.apache.hyracks.ipc.impl.IPCSystem;
@@ -198,9 +201,9 @@
         clusterIPC = new IPCSystem(new 
InetSocketAddress(ccConfig.getClusterListenPort()), ccIPCI,
                 new CCNCFunctions.SerializerDeserializer());
         IIPCI ciIPCI = new ClientInterfaceIPCI(this, jobIdFactory);
-        clientIPC =
-                new IPCSystem(new 
InetSocketAddress(ccConfig.getClientListenAddress(), 
ccConfig.getClientListenPort()),
-                        ciIPCI, new 
JavaSerializationBasedPayloadSerializerDeserializer());
+        clientIPC = new IPCSystem(
+                new InetSocketAddress(ccConfig.getClientListenAddress(), 
ccConfig.getClientListenPort()), ciIPCI,
+                new JavaSerializationBasedPayloadSerializerDeserializer());
         webServer = new WebServer(this, ccConfig.getConsoleListenPort());
         clusterIPC.start();
         clientIPC.start();
@@ -219,17 +222,19 @@
     }
 
     private void startApplication() throws Exception {
-        serviceCtx = new CCServiceContext(this, serverCtx, ccContext, 
ccConfig.getAppConfig());
+        ThreadFactory threadFactory = application.fixupThreadFactory(new 
HyracksThreadFactory("ClusterController"),
+                new LifeCycleComponentManager());
+        serviceCtx = new CCServiceContext(this, serverCtx, ccContext, 
ccConfig.getAppConfig(), threadFactory);
         serviceCtx.addJobLifecycleListener(datasetDirectoryService);
-        executor = 
Executors.newCachedThreadPool(serviceCtx.getThreadFactory());
+        executor = Executors.newCachedThreadPool(threadFactory);
         application.start(serviceCtx, ccConfig.getAppArgsArray());
         IJobCapacityController jobCapacityController = 
application.getJobCapacityController();
 
         // Job manager is in charge of job lifecycle management.
         try {
-            Constructor<?> jobManagerConstructor =
-                    
this.getClass().getClassLoader().loadClass(ccConfig.getJobManagerClass()).getConstructor(
-                            CCConfig.class, ClusterControllerService.class, 
IJobCapacityController.class);
+            Constructor<?> jobManagerConstructor = 
this.getClass().getClassLoader()
+                    .loadClass(ccConfig.getJobManagerClass())
+                    .getConstructor(CCConfig.class, 
ClusterControllerService.class, IJobCapacityController.class);
             jobManager = (IJobManager) 
jobManagerConstructor.newInstance(ccConfig, this, jobCapacityController);
         } catch (ClassNotFoundException | InstantiationException | 
IllegalAccessException | NoSuchMethodException
                 | InvocationTargetException e) {
@@ -406,8 +411,8 @@
 
         @Override
         public void getIPAddressNodeMap(Map<InetAddress, Set<String>> map) 
throws HyracksDataException {
-            GetIpAddressNodeNameMapWork ginmw =
-                    new 
GetIpAddressNodeNameMapWork(ClusterControllerService.this.getNodeManager(), 
map);
+            GetIpAddressNodeNameMapWork ginmw = new 
GetIpAddressNodeNameMapWork(
+                    ClusterControllerService.this.getNodeManager(), map);
             try {
                 workQueue.scheduleAndSync(ginmw);
             } catch (Exception e) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
index 26245e1..ab436ed 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
@@ -26,6 +26,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ThreadFactory;
 
 import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.application.IClusterLifecycleListener;
@@ -41,7 +42,6 @@
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.application.ServiceContext;
 import org.apache.hyracks.control.common.context.ServerContext;
-import org.apache.hyracks.control.common.utils.HyracksThreadFactory;
 import org.apache.hyracks.control.common.work.IResultCallback;
 
 public class CCServiceContext extends ServiceContext implements 
ICCServiceContext {
@@ -58,8 +58,8 @@
     private final ClusterControllerService ccs;
 
     public CCServiceContext(ClusterControllerService ccs, ServerContext 
serverCtx, ICCContext ccContext,
-            IApplicationConfig appConfig) throws IOException {
-        super(serverCtx, appConfig, new 
HyracksThreadFactory("ClusterController"));
+                            IApplicationConfig appConfig, ThreadFactory 
threadFactory) throws IOException {
+        super(serverCtx, appConfig, threadFactory);
         this.ccContext = ccContext;
         this.ccs = ccs;
         initPendingNodeIds = new HashSet<>();
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ServiceContext.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ServiceContext.java
index 1ee2315..2aa60ac 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ServiceContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ServiceContext.java
@@ -31,7 +31,7 @@
 public abstract class ServiceContext implements IServiceContext {
     protected final ServerContext serverCtx;
     protected final IApplicationConfig appConfig;
-    protected ThreadFactory threadFactory;
+    protected final ThreadFactory threadFactory;
     protected Serializable distributedState;
     protected IMessageBroker messageBroker;
     protected IJobSerializerDeserializerContainer jobSerDeContainer = new 
JobSerializerDeserializerContainer();
@@ -65,11 +65,6 @@
     @Override
     public ThreadFactory getThreadFactory() {
         return threadFactory;
-    }
-
-    @Override
-    public void setThreadFactory(ThreadFactory threadFactory) {
-        this.threadFactory = threadFactory;
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
index 5afc98d..f180d18 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
@@ -20,6 +20,7 @@
 
 import java.lang.management.ManagementFactory;
 import java.util.Arrays;
+import java.util.concurrent.ThreadFactory;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -29,6 +30,7 @@
 import org.apache.hyracks.api.config.Section;
 import org.apache.hyracks.api.io.IFileDeviceResolver;
 import org.apache.hyracks.api.job.resource.NodeCapacity;
+import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
 import org.apache.hyracks.control.common.controllers.CCConfig;
 import org.apache.hyracks.control.common.controllers.ControllerConfig;
 import org.apache.hyracks.control.common.controllers.NCConfig;
@@ -41,6 +43,11 @@
     }
 
     @Override
+    public ThreadFactory fixupThreadFactory(ThreadFactory orig, 
ILifeCycleComponentManager lifeCycleComponentManager) {
+        return orig;
+    }
+
+    @Override
     public void start(IServiceContext ncAppCtx, String[] args) throws 
Exception {
         if (args.length > 0) {
             throw new IllegalArgumentException("Unrecognized argument(s): " + 
Arrays.toString(args));
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index b52675c..53197b9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -37,6 +37,7 @@
 import java.util.TimerTask;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
@@ -69,6 +70,7 @@
 import org.apache.hyracks.control.common.ipc.ClusterControllerRemoteProxy;
 import 
org.apache.hyracks.control.common.ipc.IControllerRemoteProxyIPCEventListener;
 import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
+import org.apache.hyracks.control.common.utils.HyracksThreadFactory;
 import org.apache.hyracks.control.common.utils.PidHelper;
 import org.apache.hyracks.control.common.work.FutureValue;
 import org.apache.hyracks.control.common.work.WorkQueue;
@@ -364,9 +366,10 @@
     }
 
     private void startApplication() throws Exception {
-        serviceCtx = new NCServiceContext(this, serverCtx, ioManager, id, 
memoryManager, lccm, ncConfig.getAppConfig());
+        ThreadFactory threadFactory = application.fixupThreadFactory(new 
HyracksThreadFactory(id), lccm);
+        serviceCtx = new NCServiceContext(this, serverCtx, ioManager, id, 
memoryManager, lccm, ncConfig.getAppConfig(), threadFactory);
+        executor = Executors.newCachedThreadPool(threadFactory);
         application.start(serviceCtx, ncConfig.getAppArgsArray());
-        executor = 
Executors.newCachedThreadPool(serviceCtx.getThreadFactory());
     }
 
     public void updateMaxJobId(JobId jobId) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
index dc0bf0c..7f65a42 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
@@ -19,8 +19,8 @@
 package org.apache.hyracks.control.nc.application;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.io.Serializable;
+import java.util.concurrent.ThreadFactory;
 
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.application.IStateDumpHandler;
@@ -31,7 +31,6 @@
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.common.application.ServiceContext;
 import org.apache.hyracks.control.common.context.ServerContext;
-import org.apache.hyracks.control.common.utils.HyracksThreadFactory;
 import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.hyracks.control.nc.resources.memory.MemoryManager;
@@ -45,22 +44,16 @@
     private final NodeControllerService ncs;
     private IChannelInterfaceFactory messagingChannelInterfaceFactory;
 
-    public NCServiceContext(NodeControllerService ncs, ServerContext 
serverCtx, IOManager ioManager,
-            String nodeId, MemoryManager memoryManager, 
ILifeCycleComponentManager lifeCyclecomponentManager,
-            IApplicationConfig appConfig) throws IOException {
-        super(serverCtx, appConfig, new HyracksThreadFactory(nodeId));
+    public NCServiceContext(NodeControllerService ncs, ServerContext 
serverCtx, IOManager ioManager, String nodeId,
+            MemoryManager memoryManager, ILifeCycleComponentManager 
lifeCyclecomponentManager,
+            IApplicationConfig appConfig, ThreadFactory threadFactory) throws 
IOException {
+        super(serverCtx, appConfig, threadFactory);
         this.lccm = lifeCyclecomponentManager;
         this.nodeId = nodeId;
         this.ioManager = ioManager;
         this.memoryManager = memoryManager;
         this.ncs = ncs;
-        sdh = new IStateDumpHandler() {
-
-            @Override
-            public void dumpState(OutputStream os) throws IOException {
-                lccm.dumpState(os);
-            }
-        };
+        sdh = os -> lccm.dumpState(os);
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCServiceContext.java
 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCServiceContext.java
index ee74b75..5e362ff 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCServiceContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCServiceContext.java
@@ -110,10 +110,6 @@
     }
 
     @Override
-    public void setThreadFactory(ThreadFactory threadFactory) {
-    }
-
-    @Override
     public IApplicationConfig getAppConfig() {
         return null;
     }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2004
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Id30415325047008c013e305ca11ccbb76bc7d8d8
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <[email protected]>

Reply via email to