Michael Blow has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2004
Change subject: [NO ISSUE][HYR][*DB][CLUS] Startup lifecycle fixes
......................................................................
[NO ISSUE][HYR][*DB][CLUS] Startup lifecycle fixes
- Ensure thread factory is configured before using it
- Don't mark cluster state ACTIVE until after global recovery has
completed
- Failure of global recovery causes CC to shutdown
- Don't mark cluster state ACTIVE until max resource id has been
reported by all nodes
Change-Id: Id30415325047008c013e305ca11ccbb76bc7d8d8
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServiceContext.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ServiceContext.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
M
hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCServiceContext.java
19 files changed, 136 insertions(+), 88 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/04/2004/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 37e0c58..7f56c9d 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -28,6 +28,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadFactory;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -86,7 +87,7 @@
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.config.IConfigManager;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
-import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
+import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
import org.apache.hyracks.control.cc.BaseCCApplication;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.common.controllers.CCConfig;
@@ -108,6 +109,11 @@
private IHyracksClientConnection hcc;
@Override
+ public ThreadFactory fixupThreadFactory(ThreadFactory orig,
ILifeCycleComponentManager lifeCycleComponentManager) {
+ return new AsterixThreadFactory(orig, lifeCycleComponentManager);
+ }
+
+ @Override
public void start(IServiceContext serviceCtx, String[] args) throws
Exception {
if (args.length > 0) {
throw new IllegalArgumentException("Unrecognized argument(s): " +
Arrays.toString(args));
@@ -122,8 +128,6 @@
LOGGER.info("Starting Asterix cluster controller");
}
- ccServiceCtx.setThreadFactory(
- new AsterixThreadFactory(ccServiceCtx.getThreadFactory(), new
LifeCycleComponentManager()));
String strIP =
ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetAddress();
int port =
ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetPort();
hcc = new HyracksConnection(strIP, port);
@@ -207,8 +211,8 @@
}
protected HttpServer setupJSONAPIServer(ExternalProperties
externalProperties) throws Exception {
- HttpServer jsonAPIServer =
- new HttpServer(webManager.getBosses(),
webManager.getWorkers(), externalProperties.getAPIServerPort());
+ HttpServer jsonAPIServer = new HttpServer(webManager.getBosses(),
webManager.getWorkers(),
+ externalProperties.getAPIServerPort());
jsonAPIServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
jsonAPIServer.setAttribute(ASTERIX_APP_CONTEXT_INFO_ATTR, appCtx);
jsonAPIServer.setAttribute(ServletConstants.EXECUTOR_SERVICE_ATTR,
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
index 3209557..13f3afa 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/GlobalRecoveryManager.java
@@ -92,11 +92,8 @@
MetadataTransactionContext mdTxnCtx = null;
try {
MetadataManager.INSTANCE.init();
- // Loop over datasets
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
- for (Dataverse dataverse :
MetadataManager.INSTANCE.getDataverses(mdTxnCtx)) {
- mdTxnCtx = recoverDataset(appCtx, mdTxnCtx, dataverse);
- }
+ mdTxnCtx = doRecovery(appCtx, mdTxnCtx);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
// This needs to be fixed <-- Needs to shutdown the system -->
@@ -109,8 +106,8 @@
try {
MetadataManager.INSTANCE.abortTransaction(mdTxnCtx);
} catch (Exception e1) {
- LOGGER.log(Level.SEVERE, "Exception in aborting", e1);
- e1.addSuppressed(e);
+ LOGGER.log(Level.SEVERE, "Exception aborting metadata
transaction", e1);
+ e.addSuppressed(e1);
throw new IllegalStateException(e);
}
}
@@ -118,11 +115,21 @@
}
recoveryCompleted = true;
LOGGER.info("Global Recovery Completed");
+ appCtx.getClusterStateManager().refreshState();
+ }
+
+ protected MetadataTransactionContext doRecovery(ICcApplicationContext
appCtx, MetadataTransactionContext mdTxnCtx)
+ throws Exception {
+ // Loop over datasets
+ for (Dataverse dataverse :
MetadataManager.INSTANCE.getDataverses(mdTxnCtx)) {
+ mdTxnCtx = recoverDataset(appCtx, mdTxnCtx, dataverse);
+ }
+ return mdTxnCtx;
}
@Override
public void notifyStateChange(ClusterState newState) {
- if (newState != ClusterState.ACTIVE) {
+ if (newState != ClusterState.ACTIVE && newState !=
ClusterState.RECOVERING) {
recoveryCompleted = false;
}
}
@@ -132,8 +139,8 @@
if
(!dataverse.getDataverseName().equals(MetadataConstants.METADATA_DATAVERSE_NAME))
{
MetadataProvider metadataProvider = new MetadataProvider(appCtx,
dataverse);
try {
- List<Dataset> datasets =
-
MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx,
dataverse.getDataverseName());
+ List<Dataset> datasets =
MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx,
+ dataverse.getDataverseName());
for (Dataset dataset : datasets) {
if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
// External dataset
@@ -145,8 +152,8 @@
TransactionState datasetState = dsd.getState();
if (!indexes.isEmpty()) {
if (datasetState == TransactionState.BEGIN) {
- List<ExternalFile> files =
-
MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
+ List<ExternalFile> files =
MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx,
+ dataset);
// if persumed abort, roll backward
// 1. delete all pending files
for (ExternalFile file : files) {
@@ -157,8 +164,8 @@
}
// 2. clean artifacts in NCs
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- JobSpecification jobSpec =
-
ExternalIndexingOperations.buildAbortOp(dataset, indexes, metadataProvider);
+ JobSpecification jobSpec =
ExternalIndexingOperations.buildAbortOp(dataset, indexes,
+ metadataProvider);
executeHyracksJob(jobSpec);
// 3. correct the dataset state
((ExternalDatasetDetails)
dataset.getDatasetDetails()).setState(TransactionState.COMMIT);
@@ -166,13 +173,13 @@
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
} else if (datasetState ==
TransactionState.READY_TO_COMMIT) {
- List<ExternalFile> files =
-
MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset);
+ List<ExternalFile> files =
MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx,
+ dataset);
// if ready to commit, roll forward
// 1. commit indexes in NCs
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- JobSpecification jobSpec =
-
ExternalIndexingOperations.buildRecoverOp(dataset, indexes, metadataProvider);
+ JobSpecification jobSpec =
ExternalIndexingOperations.buildRecoverOp(dataset, indexes,
+ metadataProvider);
executeHyracksJob(jobSpec);
// 2. add pending files in metadata
for (ExternalFile file : files) {
@@ -221,4 +228,5 @@
public boolean isRecoveryCompleted() {
return recoveryCompleted;
}
+
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index 0f6b396..05e7ccf 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -22,6 +22,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.ThreadFactory;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -55,6 +56,7 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IFileDeviceResolver;
import org.apache.hyracks.api.job.resource.NodeCapacity;
+import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
import org.apache.hyracks.api.messages.IMessageBroker;
import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.control.common.controllers.NCConfig;
@@ -80,13 +82,16 @@
}
@Override
+ public ThreadFactory fixupThreadFactory(ThreadFactory orig,
ILifeCycleComponentManager lifeCycleComponentManager) {
+ return new AsterixThreadFactory(orig, lifeCycleComponentManager);
+ }
+
+ @Override
public void start(IServiceContext serviceCtx, String[] args) throws
Exception {
if (args.length > 0) {
throw new IllegalArgumentException("Unrecognized argument(s): " +
Arrays.toString(args));
}
this.ncServiceCtx = (INCServiceContext) serviceCtx;
- ncServiceCtx.setThreadFactory(
- new AsterixThreadFactory(ncServiceCtx.getThreadFactory(),
ncServiceCtx.getLifeCycleComponentManager()));
nodeId = this.ncServiceCtx.getNodeId();
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Starting Asterix node controller: " + nodeId);
@@ -111,8 +116,8 @@
MessagingProperties messagingProperties =
runtimeContext.getMessagingProperties();
IMessageBroker messageBroker = new NCMessageBroker(controllerService,
messagingProperties);
this.ncServiceCtx.setMessageBroker(messageBroker);
- MessagingChannelInterfaceFactory interfaceFactory =
- new MessagingChannelInterfaceFactory((NCMessageBroker)
messageBroker, messagingProperties);
+ MessagingChannelInterfaceFactory interfaceFactory = new
MessagingChannelInterfaceFactory(
+ (NCMessageBroker) messageBroker, messagingProperties);
this.ncServiceCtx.setMessagingChannelInterfaceFactory(interfaceFactory);
IRecoveryManager recoveryMgr =
runtimeContext.getTransactionSubsystem().getRecoveryManager();
@@ -125,8 +130,8 @@
LOGGER.info("Stores: " +
PrintUtil.toString(metadataProperties.getStores()));
LOGGER.info("Root Metadata Store: " +
metadataProperties.getStores().get(nodeId)[0]);
}
- PersistentLocalResourceRepository localResourceRepository =
- (PersistentLocalResourceRepository)
runtimeContext.getLocalResourceRepository();
+ PersistentLocalResourceRepository localResourceRepository =
(PersistentLocalResourceRepository) runtimeContext
+ .getLocalResourceRepository();
localResourceRepository.initializeNewUniverse(ClusterProperties.INSTANCE.getStorageDirectoryName());
}
@@ -224,8 +229,8 @@
String[] ioDevices = ((PersistentLocalResourceRepository)
runtimeContext.getLocalResourceRepository())
.getStorageMountingPoints();
for (String ioDevice : ioDevices) {
- String tempDatasetsDir =
- ioDevice + storageDirName + File.separator +
StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER;
+ String tempDatasetsDir = ioDevice + storageDirName + File.separator
+ + StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER;
File tmpDsDir = new File(tempDatasetsDir);
if (tmpDsDir.exists()) {
IoUtil.delete(tmpDsDir);
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
index bdbf4a5..1c2c6b3 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
@@ -20,23 +20,24 @@
public interface IClusterManagementWork {
- public enum WorkType {
+ enum WorkType {
ADD_NODE,
REMOVE_NODE
}
- public enum ClusterState {
+ enum ClusterState {
STARTING,
PENDING,
+ RECOVERING,
ACTIVE,
UNUSABLE,
REBALANCING,
SHUTTING_DOWN
}
- public WorkType getClusterManagementWorkType();
+ WorkType getClusterManagementWorkType();
- public int getWorkId();
+ int getWorkId();
- public IClusterEventsSubscriber getSourceSubscriber();
+ IClusterEventsSubscriber getSourceSubscriber();
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
index d36d383..ce49ccf 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
@@ -18,12 +18,14 @@
*/
package org.apache.asterix.common.transactions;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
public interface IResourceIdManager {
long createResourceId();
boolean reported(String nodeId);
- void report(String nodeId, long maxResourceId);
+ void report(String nodeId, long maxResourceId) throws HyracksDataException;
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java
index 8a3392a..a97e22a 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/GarbageCollector.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.metadata;
+import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -29,7 +30,9 @@
public class GarbageCollector implements Runnable {
private static final Logger LOGGER =
Logger.getLogger(GarbageCollector.class.getName());
- private static final long CLEANUP_PERIOD = 3600L * 24;
+ // TODO(mblow): make this configurable
+ private static final long CLEANUP_PERIOD = 1;
+ private static final TimeUnit CLEANUP_PERIOD_UNIT = TimeUnit.DAYS;
static {
// Starts the garbage collector thread which
@@ -40,13 +43,13 @@
}
@Override
- @SuppressWarnings("squid:S2142") // rethrow or interrupt thread on
InterruptedException
+ @SuppressWarnings({"squid:S2142", "squid:S2189"}) // rethrow/interrupt
thread on InterruptedException, endless loop
public void run() {
LOGGER.info("Starting Metadata GC");
while (true) {
try {
synchronized (this) {
- this.wait(CLEANUP_PERIOD);
+ CLEANUP_PERIOD_UNIT.timedWait(this, CLEANUP_PERIOD);
}
MetadataManager.INSTANCE.cleanupTempDatasets();
} catch (InterruptedException e) {
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index 82a1177..decc1a9 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -71,6 +71,6 @@
@Override
public String toString() {
- return ReportMaxResourceIdRequestMessage.class.getSimpleName();
+ return ResourceIdRequestMessage.class.getSimpleName();
}
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
index 6a5ed08..afa626d 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
@@ -24,6 +24,7 @@
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.transactions.IResourceIdManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public class ResourceIdManager implements IResourceIdManager {
@@ -59,13 +60,14 @@
}
@Override
- public synchronized void report(String nodeId, long maxResourceId) {
+ public synchronized void report(String nodeId, long maxResourceId) throws
HyracksDataException {
if (!allReported) {
globalResourceId.set(Math.max(maxResourceId,
globalResourceId.get()));
reportedNodes.add(nodeId);
if (reportedNodes.size() == csm.getNumberOfNodes()) {
reportedNodes = null;
allReported = true;
+ csm.refreshState();
}
}
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 36cb10d..95cc460 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -40,6 +40,7 @@
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.replication.IFaultToleranceStrategy;
+import org.apache.asterix.common.transactions.IResourceIdManager;
import org.apache.asterix.event.schema.cluster.Cluster;
import org.apache.asterix.event.schema.cluster.Node;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -118,6 +119,10 @@
@Override
public synchronized void setState(ClusterState state) {
+ if (this.state == state) {
+ LOGGER.info("ignoring update to same cluster state of " +
this.state);
+ return;
+ }
LOGGER.info("updating cluster state from " + this.state + " to " +
state.name());
this.state = state;
appCtx.getGlobalRecoveryManager().notifyStateChange(state);
@@ -166,24 +171,33 @@
setState(ClusterState.UNUSABLE);
return;
}
-
for (ClusterPartition p : clusterPartitions.values()) {
if (!p.isActive()) {
setState(ClusterState.UNUSABLE);
return;
}
}
-
+ IResourceIdManager resourceIdManager = appCtx.getResourceIdManager();
+ for (String node : activeNcConfiguration.keySet()) {
+ if (!resourceIdManager.reported(node)) {
+ setState(ClusterState.UNUSABLE);
+ return;
+ }
+ }
// if all storage partitions are active as well as the metadata node,
then the cluster is active
if (metadataNodeActive) {
- if (state != ClusterState.ACTIVE) {
+ if (state != ClusterState.ACTIVE && state !=
ClusterState.RECOVERING) {
setState(ClusterState.PENDING);
}
appCtx.getMetadataBootstrap().init();
- setState(ClusterState.ACTIVE);
- notifyAll();
- // start global recovery
- appCtx.getGlobalRecoveryManager().startGlobalRecovery(appCtx);
+
+ if (appCtx.getGlobalRecoveryManager().isRecoveryCompleted()) {
+ setState(ClusterState.ACTIVE);
+ } else {
+ // start global recovery
+ setState(ClusterState.RECOVERING);
+ appCtx.getGlobalRecoveryManager().startGlobalRecovery(appCtx);
+ }
} else {
setState(ClusterState.PENDING);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java
index 3ce314f..d249bb5 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IApplication.java
@@ -18,10 +18,15 @@
*/
package org.apache.hyracks.api.application;
+import java.util.concurrent.ThreadFactory;
+
import org.apache.hyracks.api.config.IConfigManager;
+import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
import org.kohsuke.args4j.OptionHandlerFilter;
public interface IApplication {
+ ThreadFactory fixupThreadFactory(ThreadFactory orig,
ILifeCycleComponentManager lifeCycleComponentManager);
+
void start(IServiceContext ctx, String[] args) throws Exception; //NOSONAR
void startupCompleted() throws Exception; //NOSONAR
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServiceContext.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServiceContext.java
index bc3d7a1..5673f01 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServiceContext.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/IServiceContext.java
@@ -44,8 +44,6 @@
ThreadFactory getThreadFactory();
- void setThreadFactory(ThreadFactory threadFactory);
-
IApplicationConfig getAppConfig();
/**
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
index b94cf01..2513456 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.control.cc;
import java.util.Arrays;
+import java.util.concurrent.ThreadFactory;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -28,6 +29,7 @@
import org.apache.hyracks.api.config.Section;
import org.apache.hyracks.api.job.resource.DefaultJobCapacityController;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
import org.apache.hyracks.control.common.controllers.CCConfig;
import org.apache.hyracks.control.common.controllers.ControllerConfig;
import org.apache.hyracks.control.common.controllers.NCConfig;
@@ -40,6 +42,11 @@
}
@Override
+ public ThreadFactory fixupThreadFactory(ThreadFactory orig,
ILifeCycleComponentManager lifeCycleComponentManager) {
+ return orig;
+ }
+
+ @Override
public void start(IServiceContext serviceCtx, String[] args) throws
Exception {
if (args.length > 0) {
throw new IllegalArgumentException("Unrecognized argument(s): " +
Arrays.toString(args));
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index a243bf8..bb712db 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -36,6 +36,7 @@
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -52,6 +53,7 @@
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.JobIdFactory;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.lifecycle.LifeCycleComponentManager;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.api.topology.ClusterTopology;
import org.apache.hyracks.api.topology.TopologyDefinitionParser;
@@ -79,6 +81,7 @@
import org.apache.hyracks.control.common.ipc.CCNCFunctions;
import org.apache.hyracks.control.common.logs.LogFile;
import org.apache.hyracks.control.common.shutdown.ShutdownRun;
+import org.apache.hyracks.control.common.utils.HyracksThreadFactory;
import org.apache.hyracks.control.common.work.WorkQueue;
import org.apache.hyracks.ipc.api.IIPCI;
import org.apache.hyracks.ipc.impl.IPCSystem;
@@ -198,9 +201,9 @@
clusterIPC = new IPCSystem(new
InetSocketAddress(ccConfig.getClusterListenPort()), ccIPCI,
new CCNCFunctions.SerializerDeserializer());
IIPCI ciIPCI = new ClientInterfaceIPCI(this, jobIdFactory);
- clientIPC =
- new IPCSystem(new
InetSocketAddress(ccConfig.getClientListenAddress(),
ccConfig.getClientListenPort()),
- ciIPCI, new
JavaSerializationBasedPayloadSerializerDeserializer());
+ clientIPC = new IPCSystem(
+ new InetSocketAddress(ccConfig.getClientListenAddress(),
ccConfig.getClientListenPort()), ciIPCI,
+ new JavaSerializationBasedPayloadSerializerDeserializer());
webServer = new WebServer(this, ccConfig.getConsoleListenPort());
clusterIPC.start();
clientIPC.start();
@@ -219,17 +222,19 @@
}
private void startApplication() throws Exception {
- serviceCtx = new CCServiceContext(this, serverCtx, ccContext,
ccConfig.getAppConfig());
+ ThreadFactory threadFactory = application.fixupThreadFactory(new
HyracksThreadFactory("ClusterController"),
+ new LifeCycleComponentManager());
+ serviceCtx = new CCServiceContext(this, serverCtx, ccContext,
ccConfig.getAppConfig(), threadFactory);
serviceCtx.addJobLifecycleListener(datasetDirectoryService);
- executor =
Executors.newCachedThreadPool(serviceCtx.getThreadFactory());
+ executor = Executors.newCachedThreadPool(threadFactory);
application.start(serviceCtx, ccConfig.getAppArgsArray());
IJobCapacityController jobCapacityController =
application.getJobCapacityController();
// Job manager is in charge of job lifecycle management.
try {
- Constructor<?> jobManagerConstructor =
-
this.getClass().getClassLoader().loadClass(ccConfig.getJobManagerClass()).getConstructor(
- CCConfig.class, ClusterControllerService.class,
IJobCapacityController.class);
+ Constructor<?> jobManagerConstructor =
this.getClass().getClassLoader()
+ .loadClass(ccConfig.getJobManagerClass())
+ .getConstructor(CCConfig.class,
ClusterControllerService.class, IJobCapacityController.class);
jobManager = (IJobManager)
jobManagerConstructor.newInstance(ccConfig, this, jobCapacityController);
} catch (ClassNotFoundException | InstantiationException |
IllegalAccessException | NoSuchMethodException
| InvocationTargetException e) {
@@ -406,8 +411,8 @@
@Override
public void getIPAddressNodeMap(Map<InetAddress, Set<String>> map)
throws HyracksDataException {
- GetIpAddressNodeNameMapWork ginmw =
- new
GetIpAddressNodeNameMapWork(ClusterControllerService.this.getNodeManager(),
map);
+ GetIpAddressNodeNameMapWork ginmw = new
GetIpAddressNodeNameMapWork(
+ ClusterControllerService.this.getNodeManager(), map);
try {
workQueue.scheduleAndSync(ginmw);
} catch (Exception e) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
index 26245e1..ab436ed 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
@@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ThreadFactory;
import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.application.IClusterLifecycleListener;
@@ -41,7 +42,6 @@
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.common.application.ServiceContext;
import org.apache.hyracks.control.common.context.ServerContext;
-import org.apache.hyracks.control.common.utils.HyracksThreadFactory;
import org.apache.hyracks.control.common.work.IResultCallback;
public class CCServiceContext extends ServiceContext implements
ICCServiceContext {
@@ -58,8 +58,8 @@
private final ClusterControllerService ccs;
public CCServiceContext(ClusterControllerService ccs, ServerContext
serverCtx, ICCContext ccContext,
- IApplicationConfig appConfig) throws IOException {
- super(serverCtx, appConfig, new
HyracksThreadFactory("ClusterController"));
+ IApplicationConfig appConfig, ThreadFactory
threadFactory) throws IOException {
+ super(serverCtx, appConfig, threadFactory);
this.ccContext = ccContext;
this.ccs = ccs;
initPendingNodeIds = new HashSet<>();
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ServiceContext.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ServiceContext.java
index 1ee2315..2aa60ac 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ServiceContext.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/application/ServiceContext.java
@@ -31,7 +31,7 @@
public abstract class ServiceContext implements IServiceContext {
protected final ServerContext serverCtx;
protected final IApplicationConfig appConfig;
- protected ThreadFactory threadFactory;
+ protected final ThreadFactory threadFactory;
protected Serializable distributedState;
protected IMessageBroker messageBroker;
protected IJobSerializerDeserializerContainer jobSerDeContainer = new
JobSerializerDeserializerContainer();
@@ -65,11 +65,6 @@
@Override
public ThreadFactory getThreadFactory() {
return threadFactory;
- }
-
- @Override
- public void setThreadFactory(ThreadFactory threadFactory) {
- this.threadFactory = threadFactory;
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
index 5afc98d..f180d18 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java
@@ -20,6 +20,7 @@
import java.lang.management.ManagementFactory;
import java.util.Arrays;
+import java.util.concurrent.ThreadFactory;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -29,6 +30,7 @@
import org.apache.hyracks.api.config.Section;
import org.apache.hyracks.api.io.IFileDeviceResolver;
import org.apache.hyracks.api.job.resource.NodeCapacity;
+import org.apache.hyracks.api.lifecycle.ILifeCycleComponentManager;
import org.apache.hyracks.control.common.controllers.CCConfig;
import org.apache.hyracks.control.common.controllers.ControllerConfig;
import org.apache.hyracks.control.common.controllers.NCConfig;
@@ -41,6 +43,11 @@
}
@Override
+ public ThreadFactory fixupThreadFactory(ThreadFactory orig,
ILifeCycleComponentManager lifeCycleComponentManager) {
+ return orig;
+ }
+
+ @Override
public void start(IServiceContext ncAppCtx, String[] args) throws
Exception {
if (args.length > 0) {
throw new IllegalArgumentException("Unrecognized argument(s): " +
Arrays.toString(args));
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index b52675c..53197b9 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -37,6 +37,7 @@
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
@@ -69,6 +70,7 @@
import org.apache.hyracks.control.common.ipc.ClusterControllerRemoteProxy;
import
org.apache.hyracks.control.common.ipc.IControllerRemoteProxyIPCEventListener;
import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
+import org.apache.hyracks.control.common.utils.HyracksThreadFactory;
import org.apache.hyracks.control.common.utils.PidHelper;
import org.apache.hyracks.control.common.work.FutureValue;
import org.apache.hyracks.control.common.work.WorkQueue;
@@ -364,9 +366,10 @@
}
private void startApplication() throws Exception {
- serviceCtx = new NCServiceContext(this, serverCtx, ioManager, id,
memoryManager, lccm, ncConfig.getAppConfig());
+ ThreadFactory threadFactory = application.fixupThreadFactory(new
HyracksThreadFactory(id), lccm);
+ serviceCtx = new NCServiceContext(this, serverCtx, ioManager, id,
memoryManager, lccm, ncConfig.getAppConfig(), threadFactory);
+ executor = Executors.newCachedThreadPool(threadFactory);
application.start(serviceCtx, ncConfig.getAppArgsArray());
- executor =
Executors.newCachedThreadPool(serviceCtx.getThreadFactory());
}
public void updateMaxJobId(JobId jobId) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
index dc0bf0c..7f65a42 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/application/NCServiceContext.java
@@ -19,8 +19,8 @@
package org.apache.hyracks.control.nc.application;
import java.io.IOException;
-import java.io.OutputStream;
import java.io.Serializable;
+import java.util.concurrent.ThreadFactory;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.application.IStateDumpHandler;
@@ -31,7 +31,6 @@
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.common.application.ServiceContext;
import org.apache.hyracks.control.common.context.ServerContext;
-import org.apache.hyracks.control.common.utils.HyracksThreadFactory;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.hyracks.control.nc.resources.memory.MemoryManager;
@@ -45,22 +44,16 @@
private final NodeControllerService ncs;
private IChannelInterfaceFactory messagingChannelInterfaceFactory;
- public NCServiceContext(NodeControllerService ncs, ServerContext
serverCtx, IOManager ioManager,
- String nodeId, MemoryManager memoryManager,
ILifeCycleComponentManager lifeCyclecomponentManager,
- IApplicationConfig appConfig) throws IOException {
- super(serverCtx, appConfig, new HyracksThreadFactory(nodeId));
+ public NCServiceContext(NodeControllerService ncs, ServerContext
serverCtx, IOManager ioManager, String nodeId,
+ MemoryManager memoryManager, ILifeCycleComponentManager
lifeCyclecomponentManager,
+ IApplicationConfig appConfig, ThreadFactory threadFactory) throws
IOException {
+ super(serverCtx, appConfig, threadFactory);
this.lccm = lifeCyclecomponentManager;
this.nodeId = nodeId;
this.ioManager = ioManager;
this.memoryManager = memoryManager;
this.ncs = ncs;
- sdh = new IStateDumpHandler() {
-
- @Override
- public void dumpState(OutputStream os) throws IOException {
- lccm.dumpState(os);
- }
- };
+ sdh = os -> lccm.dumpState(os);
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCServiceContext.java
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCServiceContext.java
index ee74b75..5e362ff 100644
---
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCServiceContext.java
+++
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestNCServiceContext.java
@@ -110,10 +110,6 @@
}
@Override
- public void setThreadFactory(ThreadFactory threadFactory) {
- }
-
- @Override
public IApplicationConfig getAppConfig() {
return null;
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/2004
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Id30415325047008c013e305ca11ccbb76bc7d8d8
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <[email protected]>